mirror of
synced 2025-03-25 13:27:58 +00:00
CORDA-530 Don't soft-lock non-fungible states (#1794)
* Don't run unlock query if nothing was locked * Constructors should not have side-effects
This commit is contained in:
@ -274,8 +274,7 @@ abstract class AbstractNode(config: NodeConfiguration,
require(customNotaryServiceList.size == 1) {
"Attempting to install more than one notary service: ${customNotaryServiceList.joinToString()}"
else return loadedServices - customNotaryServiceList
} else return loadedServices - customNotaryServiceList
return loadedServices
@ -490,9 +489,9 @@ abstract class AbstractNode(config: NodeConfiguration,
protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers() {
VaultSoftLockManager(services.vaultService, smm)
HibernateObserver(services.vaultService.rawUpdates, services.database.hibernateConfig)
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, services.schedulerService)
HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig)
private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo {
@ -758,6 +757,9 @@ abstract class AbstractNode(config: NodeConfiguration,
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, stateLoader, database.hibernateConfig)
private inner class ServiceHubInternalImpl(
override val schemaService: SchemaService,
@ -771,7 +773,7 @@ abstract class AbstractNode(config: NodeConfiguration,
override val auditService = DummyAuditService()
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
override val networkMapCache by lazy { PersistentNetworkMapCache(this) }
override val vaultService by lazy { NodeVaultService(platformClock, keyManagementService, stateLoader, this@AbstractNode.database.hibernateConfig) }
override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader) }
override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() }
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
@ -4,18 +4,28 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateAndRef
import net.corda.node.services.api.ServiceHubInternal
import net.corda.core.node.services.VaultService
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
* This observes the vault and schedules and unschedules activities appropriately based on state production and
* consumption.
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
services.vaultService.rawUpdates.subscribe { (consumed, produced) ->
consumed.forEach { services.schedulerService.unscheduleStateActivity(it.ref) }
produced.forEach { scheduleStateActivity(it) }
class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService) {
companion object {
fun install(vaultService: VaultService, schedulerService: SchedulerService) {
val observer = ScheduledActivityObserver(schedulerService)
vaultService.rawUpdates.subscribe { (consumed, produced) ->
consumed.forEach { schedulerService.unscheduleStateActivity(it.ref) }
produced.forEach { observer.scheduleStateActivity(it) }
// TODO: Beware we are calling dynamically loaded contract code inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
@ -23,12 +33,7 @@ class ScheduledActivityObserver(val services: ServiceHubInternal) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactoryImpl)?.scheduledAt } ?: return
services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
// TODO: Beware we are calling dynamically loaded contract code inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
@ -3,6 +3,7 @@ package net.corda.node.services.schema
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.services.Vault
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
@ -17,14 +18,15 @@ import rx.Observable
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, val config: HibernateConfiguration) {
class HibernateObserver private constructor(private val config: HibernateConfiguration) {
companion object {
val logger = loggerFor<HibernateObserver>()
init {
vaultUpdates.subscribe { persist(it.produced) }
private val log = loggerFor<HibernateObserver>()
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration): HibernateObserver {
val observer = HibernateObserver(config)
vaultUpdates.subscribe { observer.persist(it.produced) }
return observer
private fun persist(produced: Set<StateAndRef<ContractState>>) {
@ -33,11 +35,12 @@ class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, v
private fun persistState(stateAndRef: StateAndRef<ContractState>) {
val state = stateAndRef.state.data
logger.debug { "Asked to persist state ${stateAndRef.ref}" }
log.debug { "Asked to persist state ${stateAndRef.ref}" }
config.schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) {
internal fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) {
val sessionFactory = config.sessionFactoryForSchemas(setOf(schema))
val session = sessionFactory.withOptions().
@ -68,12 +68,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* is not necessary.
override val logger: Logger = LoggerFactory.getLogger("net.corda.flow.$id")
@Transient private var _resultFuture: OpenFuture<R>? = openFuture()
@Transient private var resultFutureTransient: OpenFuture<R>? = openFuture()
private val _resultFuture get() = resultFutureTransient ?: openFuture<R>().also { resultFutureTransient = it }
/** This future will complete when the call method returns. */
override val resultFuture: CordaFuture<R>
get() = _resultFuture ?: openFuture<R>().also { _resultFuture = it }
override val resultFuture: CordaFuture<R> get() = _resultFuture
// This state IS serialised, as we need it to know what the fiber is waiting for.
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSessionInternal>()
internal var waitingForResponse: WaitingRequest? = null
@ -115,7 +113,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd(Try.Success(result), false)
logic.progressTracker?.currentStep = ProgressTracker.DONE
logger.debug { "Flow finished with result ${result.toString().abbreviate(300)}" }
@ -128,7 +126,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun processException(exception: Throwable, propagated: Boolean) {
actionOnEnd(Try.Failure(exception), propagated)
@ -269,7 +269,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
update.where(stateStatusPredication, lockIdPredicate, *commonPredicates)
if (updatedRows > 0 && updatedRows == stateRefs.size) {
log.trace("Reserving soft lock states for $lockId: $stateRefs")
log.trace { "Reserving soft lock states for $lockId: $stateRefs" }
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true
} else {
// revert partial soft locks
@ -280,7 +280,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
update.where(lockUpdateTime, lockIdPredicate, *commonPredicates)
if (revertUpdatedRows > 0) {
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
log.trace { "Reverting $revertUpdatedRows partially soft locked states for $lockId" }
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
@ -309,7 +309,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
if (update > 0) {
log.trace("Releasing $update soft locked states for $lockId")
log.trace { "Releasing $update soft locked states for $lockId" }
} else {
try {
@ -320,7 +320,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic
update.where(*commonPredicates, stateRefsPredicate)
if (updatedRows > 0) {
log.trace("Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs")
log.trace { "Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs" }
} catch (e: Exception) {
log.error("""soft lock update error attempting to release states for $lockId and $stateRefs")
@ -1,8 +1,8 @@
package net.corda.node.services.vault
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.services.VaultService
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.loggerFor
@ -12,50 +12,50 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import java.util.*
class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
private companion object {
val log = loggerFor<VaultSoftLockManager>()
init {
smm.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) {
log.trace { "Remove flow name ${change.logic.javaClass} with id $change.id" }
unregisterSoftLocks(change.logic.runId, change.logic)
class VaultSoftLockManager private constructor(private val vault: VaultService) {
companion object {
private val log = loggerFor<VaultSoftLockManager>()
fun install(vault: VaultService, smm: StateMachineManager) {
val manager = VaultSoftLockManager(vault)
smm.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed) {
val logic = change.logic
// Don't run potentially expensive query if the flow didn't lock any states:
if ((logic.stateMachine as FlowStateMachineImpl<*>).hasSoftLockedStates) {
manager.unregisterSoftLocks(logic.runId.uuid, logic)
// Discussion
// The intent of the following approach is to support what might be a common pattern in a flow:
// 1. Create state
// 2. Do something with state
// without possibility of another flow intercepting the state between 1 and 2,
// since we cannot lock the state before it exists. e.g. Issue and then Move some Cash.
// The downside is we could have a long running flow that holds a lock for a long period of time.
// However, the lock can be programmatically released, like any other soft lock,
// should we want a long running flow that creates a visible state mid way through.
vault.rawUpdates.subscribe { (_, produced, flowId) ->
flowId?.let {
if (produced.isNotEmpty()) {
registerSoftLocks(flowId, (produced.map { it.ref }).toNonEmptySet())
// Discussion
// The intent of the following approach is to support what might be a common pattern in a flow:
// 1. Create state
// 2. Do something with state
// without possibility of another flow intercepting the state between 1 and 2,
// since we cannot lock the state before it exists. e.g. Issue and then Move some Cash.
// The downside is we could have a long running flow that holds a lock for a long period of time.
// However, the lock can be programmatically released, like any other soft lock,
// should we want a long running flow that creates a visible state mid way through.
vault.rawUpdates.subscribe { (_, produced, flowId) ->
if (flowId != null) {
val fungible = produced.filter { it.state.data is FungibleAsset<*> }
if (fungible.isNotEmpty()) {
manager.registerSoftLocks(flowId, fungible.map { it.ref }.toNonEmptySet())
private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet<StateRef>) {
log.trace("Reserving soft locks for flow id $flowId and states $stateRefs")
log.trace { "Reserving soft locks for flow id $flowId and states $stateRefs" }
vault.softLockReserve(flowId, stateRefs)
private fun unregisterSoftLocks(id: StateMachineRunId, logic: FlowLogic<*>) {
val flowClassName = logic.javaClass.simpleName
log.trace("Releasing soft locks for flow $flowClassName with flow id ${id.uuid}")
private fun unregisterSoftLocks(flowId: UUID, logic: FlowLogic<*>) {
log.trace { "Releasing soft locks for flow ${logic.javaClass.simpleName} with flow id $flowId" }
@ -45,7 +45,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
override val vaultService: VaultServiceInternal
get() {
val vaultService = NodeVaultService(clock, keyManagementService, stateLoader, database.hibernateConfig)
hibernatePersister = HibernateObserver(vaultService.rawUpdates, database.hibernateConfig)
hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig)
return vaultService
@ -69,8 +69,7 @@ class HibernateObserverTests {
val database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), ::makeTestIdentityService, schemaService)
val observer = HibernateObserver(rawUpdatesPublisher, database.hibernateConfig)
HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig)
database.transaction {
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()
@ -0,0 +1,161 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
import net.corda.core.contracts.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.AbstractParty
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.packageName
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.StateLoader
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingType.LOCKED_ONLY
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Test
import java.math.BigInteger
import java.security.KeyPair
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.reflect.jvm.jvmName
import kotlin.test.assertEquals
private class NodePair(private val mockNet: MockNetwork) {
private class ServerLogic(private val session: FlowSession, private val running: AtomicBoolean) : FlowLogic<Unit>() {
override fun call() {
session.receive<String>().unwrap { assertEquals("ping", it) }
abstract class AbstractClientLogic<out T>(nodePair: NodePair) : FlowLogic<T>() {
protected val server = nodePair.server.info.chooseIdentity()
protected abstract fun callImpl(): T
override fun call() = callImpl().also {
initiateFlow(server).sendAndReceive<String>("ping").unwrap { assertEquals("pong", it) }
private val serverRunning = AtomicBoolean()
val server = mockNet.createNode()
var client = mockNet.createNode().apply {
internals.disableDBCloseOnStop() // Otherwise the in-memory database may disappear (taking the checkpoint with it) while we reboot the client.
private set
fun <T> communicate(clientLogic: AbstractClientLogic<T>, rebootClient: Boolean): FlowStateMachine<T> {
server.internals.internalRegisterFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false)
while (!serverRunning.get()) mockNet.runNetwork(1)
if (rebootClient) {
client = mockNet.createNode(client.internals.id)
return uncheckedCast(client.smm.allStateMachines.single().stateMachine)
class VaultSoftLockManagerTest {
private val mockVault: VaultServiceInternal = mock()
private val mockNet = MockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = object : MockNetwork.Factory<MockNetwork.MockNode> {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, id: Int, notaryIdentity: Pair<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, id, notaryIdentity, entropyRoot) {
override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal {
val realVault = super.makeVaultService(keyManagementService, stateLoader)
return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
private val nodePair = NodePair(mockNet)
fun tearDown() {
private object CommandDataImpl : CommandData
private class ClientLogic(nodePair: NodePair, private val state: ContractState) : NodePair.AbstractClientLogic<List<ContractState>>(nodePair) {
override fun callImpl() = run {
subFlow(FinalityFlow(serviceHub.signInitialTransaction(TransactionBuilder(notary = ourIdentity).apply {
addOutputState(state, ContractImpl::class.jvmName)
addCommand(CommandDataImpl, ourIdentity.owningKey)
serviceHub.vaultService.queryBy<ContractState>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(LOCKED_ONLY))).states.map {
private abstract class SingleParticipantState(nodePair: NodePair) : ContractState {
override val participants = listOf(nodePair.client.info.chooseIdentity())
private class PlainOldState(nodePair: NodePair) : SingleParticipantState(nodePair)
private class FungibleAssetImpl(nodePair: NodePair) : SingleParticipantState(nodePair), FungibleAsset<Unit> {
override val owner get() = participants[0]
override fun withNewOwner(newOwner: AbstractParty) = throw UnsupportedOperationException()
override val amount get() = Amount(1, Issued(PartyAndReference(owner, OpaqueBytes.of(1)), Unit))
override val exitKeys get() = throw UnsupportedOperationException()
override fun withNewOwnerAndAmount(newAmount: Amount<Issued<Unit>>, newOwner: AbstractParty) = throw UnsupportedOperationException()
override fun equals(other: Any?) = other is FungibleAssetImpl && participants == other.participants
override fun hashCode() = participants.hashCode()
class ContractImpl : Contract {
override fun verify(tx: LedgerTransaction) {}
private fun run(expectSoftLock: Boolean, state: ContractState, checkpoint: Boolean) {
val fsm = nodePair.communicate(ClientLogic(nodePair, state), checkpoint)
if (expectSoftLock) {
assertEquals(listOf(state), fsm.resultFuture.getOrThrow())
verify(mockVault).softLockRelease(fsm.id.uuid, null)
} else {
assertEquals(emptyList(), fsm.resultFuture.getOrThrow())
// In this case we don't want softLockRelease called so that we avoid its expensive query, even after restore from checkpoint.
fun `plain old state is not soft locked`() = run(false, PlainOldState(nodePair), false)
fun `plain old state is not soft locked with checkpoint`() = run(false, PlainOldState(nodePair), true)
fun `fungible asset is soft locked`() = run(true, FungibleAssetImpl(nodePair), false)
fun `fungible asset is soft locked with checkpoint`() = run(true, FungibleAssetImpl(nodePair), true)
@ -171,7 +171,7 @@ open class MockServices(
fun makeVaultService(hibernateConfig: HibernateConfiguration): VaultServiceInternal {
val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, stateLoader, hibernateConfig)
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, hibernateConfig)
return vaultService
Reference in New Issue
Block a user