CORDA-3725 Fix SQL deadlocks coming from soft locking states (#6287)

Adding to the query -explicitly- the flow's locked states resolved the SQL Deadlocks in SQL server. The SQL Deadlocks would come up  from `softLockRelease` when only the `lockId` was passed in as argument. In that case the query optimizer would use `lock_id_idx(lock_id, state_status)` to search and update entries in `VAULT_STATES` table. 

However, all rest of the queries would follow the opposite direction meaning they would use PK's `index(output_index, transaction_id)` but they would also update the `lock_id` column and therefore the `lock_id_idx` as well, because `lock_id` is a part of it. That was causing a circular locking among the different transactions (SQL processes) within the database.

To resolve this, whenever a flow attempts to reserve soft locks using their flow id (remember the flow id is always the flow id for the very first flow in the flow stack), we then save these states to the fiber. Then, upon releasing soft locks the fiber passes that set to the release soft locks query. That way the database query optimizer will use the primary key index of VAULT_STATES table, instead of lock_id_idx in order to search rows to update. That way the query will be aligned with the rest of the queries that are following that route as well (i.e. making use of the primary key), and therefore its locking order of resources within the database will be aligned with the rest queries' locking orders (solving SQL deadlocks).

* Fixed SQL deadlocks caused from softLockRelease, by saving locked states per fiber; NodeVaultService.softLockRelease query then uses VAULT_STATES PK index instead of lock_id_idx

Speed up SQL server by breaking down queries with > 16 elements in their IN clause, into sub queries with 16 elements max in their IN clauses

* Allow softLockedStates to remove states

* Add to softLockedStates only states soft locked under our flow id

* Fix softLockRelease not to take into account flowStateMachineImpl.softLockedStates when using lockId != ourFlowId

* Moved CriteriaBuilder.executeUpdate at the bottom of the file
This commit is contained in:
Kyriakos Tharrouniatis 2020-05-29 12:35:05 +01:00 committed by GitHub
parent 598228634f
commit 4507b55857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 452 additions and 23 deletions

View File

@ -8,6 +8,7 @@ import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.channels.Channel import co.paralleluniverse.strands.channels.Channel
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.contracts.StateRef
import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.Cordapp
import net.corda.core.flows.Destination import net.corda.core.flows.Destination
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
@ -132,10 +133,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity
override val isKilled: Boolean get() = transientState!!.value.isKilled override val isKilled: Boolean get() = transientState!!.value.isKilled
internal var hasSoftLockedStates: Boolean = false internal val softLockedStates = mutableSetOf<StateRef>()
set(value) {
if (value) field = value else throw IllegalArgumentException("Can only set to true")
}
/** /**
* Processes an event by creating the associated transition and executing it using the given executor. * Processes an event by creating the associated transition and executing it using the given executor.
@ -306,7 +304,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logger.info("Flow raised an error: ${t.message}. Sending it to flow hospital to be triaged.") logger.info("Flow raised an error: ${t.message}. Sending it to flow hospital to be triaged.")
Try.Failure<R>(t) Try.Failure<R>(t)
} }
val softLocksId = if (hasSoftLockedStates) logic.runId.uuid else null val softLocksId = if (softLockedStates.isNotEmpty()) logic.runId.uuid else null
val finalEvent = when (resultOrError) { val finalEvent = when (resultOrError) {
is Try.Success -> { is Try.Success -> {
Event.FlowFinish(resultOrError.value, softLocksId) Event.FlowFinish(resultOrError.value, softLocksId)

View File

@ -42,11 +42,6 @@ import javax.persistence.criteria.CriteriaUpdate
import javax.persistence.criteria.Predicate import javax.persistence.criteria.Predicate
import javax.persistence.criteria.Root import javax.persistence.criteria.Root
private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(CriteriaUpdate<*>) -> Any?) = createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java).let { update ->
update.from(VaultSchemaV1.VaultStates::class.java).run { configure(update) }
session.createQuery(update).executeUpdate()
}
/** /**
* The vault service handles storage, retrieval and querying of states. * The vault service handles storage, retrieval and querying of states.
* *
@ -67,6 +62,8 @@ class NodeVaultService(
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
val MAX_SQL_IN_CLAUSE_SET = 16
/** /**
* Establish whether a given state is relevant to a node, given the node's public keys. * Establish whether a given state is relevant to a node, given the node's public keys.
* *
@ -462,13 +459,20 @@ class NodeVaultService(
} }
} }
/**
* Whenever executed inside a [FlowStateMachineImpl], if [lockId] refers to the currently running [FlowStateMachineImpl],
* then in that case the [FlowStateMachineImpl] instance is locking states with its [FlowStateMachineImpl.id]'s [UUID].
* In this case alone, we keep the reserved set of [StateRef] in [FlowStateMachineImpl.softLockedStates]. This set will be then
* used by default in [softLockRelease].
*/
@Suppress("NestedBlockDepth", "ComplexMethod")
@Throws(StatesNotAvailableException::class) @Throws(StatesNotAvailableException::class)
override fun softLockReserve(lockId: UUID, stateRefs: NonEmptySet<StateRef>) { override fun softLockReserve(lockId: UUID, stateRefs: NonEmptySet<StateRef>) {
val softLockTimestamp = clock.instant() val softLockTimestamp = clock.instant()
try { try {
val session = currentDBSession() val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder val criteriaBuilder = session.criteriaBuilder
fun execute(configure: Root<*>.(CriteriaUpdate<*>, Array<Predicate>) -> Any?) = criteriaBuilder.executeUpdate(session) { update -> fun execute(configure: Root<*>.(CriteriaUpdate<*>, Array<Predicate>) -> Any?) = criteriaBuilder.executeUpdate(session, null) { update, _ ->
val persistentStateRefs = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) } val persistentStateRefs = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
val compositeKey = get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name) val compositeKey = get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs)) val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
@ -485,7 +489,11 @@ class NodeVaultService(
} }
if (updatedRows > 0 && updatedRows == stateRefs.size) { if (updatedRows > 0 && updatedRows == stateRefs.size) {
log.trace { "Reserving soft lock states for $lockId: $stateRefs" } log.trace { "Reserving soft lock states for $lockId: $stateRefs" }
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true FlowStateMachineImpl.currentStateMachine()?.let {
if (lockId == it.id.uuid) {
it.softLockedStates.addAll(stateRefs)
}
}
} else { } else {
// revert partial soft locks // revert partial soft locks
val revertUpdatedRows = execute { update, commonPredicates -> val revertUpdatedRows = execute { update, commonPredicates ->
@ -508,19 +516,44 @@ class NodeVaultService(
} }
} }
/**
* Whenever executed inside a [FlowStateMachineImpl], if [lockId] refers to the currently running [FlowStateMachineImpl] and [stateRefs] is null,
* then in that case the [FlowStateMachineImpl] instance will, by default, retrieve its set of [StateRef]
* from [FlowStateMachineImpl.softLockedStates] (previously reserved from [softLockReserve]). This set will be then explicitly provided
* to the below query which then leads to the database query optimizer use the primary key index in VAULT_STATES table, instead of lock_id_idx
* in order to search rows to be updated. That way the query will be aligned with the rest of the queries that are following that route as well
* (i.e. making use of the primary key), and therefore its locking order of resources within the database will be aligned
* with the rest queries' locking orders (solving SQL deadlocks).
*
* If [lockId] does not refer to the currently running [FlowStateMachineImpl] and [stateRefs] is null, then it will be using only [lockId] in
* the below query.
*/
@Suppress("NestedBlockDepth", "ComplexMethod")
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
val softLockTimestamp = clock.instant() val softLockTimestamp = clock.instant()
val session = currentDBSession() val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder val criteriaBuilder = session.criteriaBuilder
fun execute(configure: Root<*>.(CriteriaUpdate<*>, Array<Predicate>) -> Any?) = criteriaBuilder.executeUpdate(session) { update -> fun execute(stateRefs: NonEmptySet<StateRef>?, configure: Root<*>.(CriteriaUpdate<*>, Array<Predicate>, List<PersistentStateRef>?) -> Any?) =
criteriaBuilder.executeUpdate(session, stateRefs) { update, persistentStateRefs ->
val stateStatusPredication = criteriaBuilder.equal(get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED) val stateStatusPredication = criteriaBuilder.equal(get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
val lockIdPredicate = criteriaBuilder.equal(get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString()) val lockIdPredicate = criteriaBuilder.equal(get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString())
update.set<String>(get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java)) update.set<String>(get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java))
update.set(get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp) update.set(get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp)
configure(update, arrayOf(stateStatusPredication, lockIdPredicate)) configure(update, arrayOf(stateStatusPredication, lockIdPredicate), persistentStateRefs)
} }
if (stateRefs == null) {
val update = execute { update, commonPredicates -> val stateRefsToBeReleased =
stateRefs ?: FlowStateMachineImpl.currentStateMachine()?.let {
// We only hold states under our flowId. For all other lockId fall back to old query mechanism, i.e. stateRefsToBeReleased = null
if (lockId == it.id.uuid && it.softLockedStates.isNotEmpty()) {
NonEmptySet.copyOf(it.softLockedStates)
} else {
null
}
}
if (stateRefsToBeReleased == null) {
val update = execute(null) { update, commonPredicates, _ ->
update.where(*commonPredicates) update.where(*commonPredicates)
} }
if (update > 0) { if (update > 0) {
@ -528,19 +561,21 @@ class NodeVaultService(
} }
} else { } else {
try { try {
val updatedRows = execute { update, commonPredicates -> val updatedRows = execute(stateRefsToBeReleased) { update, commonPredicates, persistentStateRefs ->
val persistentStateRefs = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
val compositeKey = get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name) val compositeKey = get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs)) val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
update.where(*commonPredicates, stateRefsPredicate) update.where(*commonPredicates, stateRefsPredicate)
} }
if (updatedRows > 0) { if (updatedRows > 0) {
log.trace { "Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs" } FlowStateMachineImpl.currentStateMachine()?.let {
if (lockId == it.id.uuid) {
it.softLockedStates.removeAll(stateRefsToBeReleased)
}
}
log.trace { "Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefsToBeReleased" }
} }
} catch (e: Exception) { } catch (e: Exception) {
log.error("""soft lock update error attempting to release states for $lockId and $stateRefs") log.error("Soft lock update error attempting to release states for $lockId and $stateRefsToBeReleased", e)
$e.
""")
throw e throw e
} }
} }
@ -819,5 +854,29 @@ class NodeVaultService(
} }
} }
private fun CriteriaBuilder.executeUpdate(
session: Session,
stateRefs: NonEmptySet<StateRef>?,
configure: Root<*>.(CriteriaUpdate<*>, List<PersistentStateRef>?) -> Any?
): Int {
fun doUpdate(persistentStateRefs: List<PersistentStateRef>?): Int {
createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java).let { update ->
update.from(VaultSchemaV1.VaultStates::class.java).run { configure(update, persistentStateRefs) }
return session.createQuery(update).executeUpdate()
}
}
return stateRefs?.let {
// Increase SQL server performance by, processing updates in chunks allowing the database's optimizer to make use of the index.
var updatedRows = 0
it.asSequence()
.map { stateRef -> PersistentStateRef(stateRef.txhash.bytes.toHexString(), stateRef.index) }
.chunked(NodeVaultService.MAX_SQL_IN_CLAUSE_SET)
.forEach { persistentStateRefs ->
updatedRows += doUpdate(persistentStateRefs)
}
updatedRows
} ?: doUpdate(null)
}
/** The Observable returned allows subscribing with custom SafeSubscribers to source [Observable]. */ /** The Observable returned allows subscribing with custom SafeSubscribers to source [Observable]. */
internal fun<T> Observable<T>.resilientOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, false)) internal fun<T> Observable<T>.resilientOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, false))

View File

@ -56,6 +56,7 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.TestStartedNode
@ -118,7 +119,7 @@ class FlowFrameworkTests {
@Before @Before
fun setUpMockNet() { fun setUpMockNet() {
mockNet = InternalMockNetwork( mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP), cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, FINANCE_CONTRACTS_CORDAPP),
servicePeerAllocationStrategy = RoundRobin() servicePeerAllocationStrategy = RoundRobin()
) )

View File

@ -0,0 +1,330 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Command
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.node.services.statemachine.FlowSoftLocksTests.Companion.queryCashStates
import net.corda.node.services.vault.NodeVaultServiceTest
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOC_NAME
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.vault.VaultFiller
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowSoftLocksTests {
companion object {
fun queryCashStates(softLockingType: QueryCriteria.SoftLockingType, vaultService: VaultService) =
vaultService.queryBy<Cash.State>(
QueryCriteria.VaultQueryCriteria(
softLockingCondition = QueryCriteria.SoftLockingCondition(
softLockingType
)
)
).states.map { it.ref }.toSet()
val EMPTY_SET = emptySet<StateRef>()
}
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: TestStartedNode
private lateinit var notaryIdentity: Party
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, FINANCE_CONTRACTS_CORDAPP)
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
notaryIdentity = mockNet.defaultNotaryIdentity
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and then manually releases them`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates),
SoftLockAction(SoftLockingAction.UNLOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(vaultStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and by default releases them when completing`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(vaultStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and by default releases them when errors`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val softLockActions = arrayOf(
SoftLockAction(
SoftLockingAction.LOCK,
null,
vaultStates,
ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY),
expectedSoftLockedStates = vaultStates,
exception = IllegalStateException("Throwing error after flow has soft locked states")
)
)
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
}
assertEquals(vaultStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
LockingUnlockingFlow.throwOnlyOnce = true
}
@Test(timeout=300_000)
fun `flow reserves fungible states with random id and then manually releases them`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET),
SoftLockAction(SoftLockingAction.UNLOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(vaultStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow reserves fungible states with random id and does not release them upon completing`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(vaultStates, queryCashStates(QueryCriteria.SoftLockingType.LOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow only releases by default reserved states with flow id upon completing`() {
// lock with flow id and random id, dont manually release any. At the end, check that only flow id ones got unlocked.
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, flowIdStates, ExpectedSoftLocks(flowIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(SoftLockingAction.LOCK, randomId, randomIdStates, ExpectedSoftLocks(flowIdStates + randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(flowIdStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
assertEquals(randomIdStates, queryCashStates(QueryCriteria.SoftLockingType.LOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow reserves fungible states with flow id and random id, then releases the flow id ones - assert the random id ones are still locked`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, flowIdStates, ExpectedSoftLocks(flowIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(SoftLockingAction.LOCK, randomId, randomIdStates, ExpectedSoftLocks(flowIdStates + randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(SoftLockingAction.UNLOCK, null, flowIdStates, ExpectedSoftLocks(randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(flowIdStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
assertEquals(randomIdStates, queryCashStates(QueryCriteria.SoftLockingType.LOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow reserves fungible states with flow id and random id, then releases the random id ones - assert the flow id ones are still locked inside the flow`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, flowIdStates, ExpectedSoftLocks(flowIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(SoftLockingAction.LOCK, randomId, randomIdStates, ExpectedSoftLocks(flowIdStates + randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(SoftLockingAction.UNLOCK, randomId, randomIdStates, ExpectedSoftLocks(flowIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(flowIdStates + randomIdStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
}
@Test(timeout=300_000)
fun `flow soft locks fungible state upon creation`() {
var lockedStates = 0
CreateFungibleStateFLow.hook = { vaultService ->
lockedStates = vaultService.queryBy<NodeVaultServiceTest.FungibleFoo>(
QueryCriteria.VaultQueryCriteria(softLockingCondition = QueryCriteria.SoftLockingCondition(QueryCriteria.SoftLockingType.LOCKED_ONLY))
).states.size
}
aliceNode.services.startFlow(CreateFungibleStateFLow()).resultFuture.getOrThrow(30.seconds)
assertEquals(1, lockedStates)
}
@Test(timeout=300_000)
fun `when flow soft locks, then errors and retries from previous checkpoint, softLockedStates are reverted back correctly`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, flowIdStates, ExpectedSoftLocks(flowIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = flowIdStates),
SoftLockAction(
SoftLockingAction.LOCK,
randomId,
randomIdStates,
ExpectedSoftLocks(flowIdStates + randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY),
expectedSoftLockedStates = flowIdStates,
doCheckpoint = true
),
SoftLockAction(SoftLockingAction.UNLOCK, null, flowIdStates, ExpectedSoftLocks(randomIdStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET),
SoftLockAction(
SoftLockingAction.UNLOCK,
randomId,
randomIdStates,
ExpectedSoftLocks(EMPTY_SET, QueryCriteria.SoftLockingType.LOCKED_ONLY),
expectedSoftLockedStates = EMPTY_SET,
exception = SQLTransientConnectionException("connection is not available")
)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
assertTrue(flowCompleted)
assertEquals(flowIdStates + randomIdStates, queryCashStates(QueryCriteria.SoftLockingType.UNLOCKED_ONLY, aliceNode.services.vaultService))
LockingUnlockingFlow.throwOnlyOnce = true
}
private fun fillVault(node: TestStartedNode, thisManyStates: Int): Vault<Cash.State>? {
val bankNode = mockNet.createPartyNode(BOC_NAME)
val bank = bankNode.info.singleIdentity()
val cashIssuer = bank.ref(1)
return node.database.transaction {
VaultFiller(node.services, TestIdentity(notaryIdentity.name, 20), notaryIdentity).fillWithSomeTestCash(
100.DOLLARS,
bankNode.services,
thisManyStates,
thisManyStates,
cashIssuer
)
}
}
}
enum class SoftLockingAction {
LOCK,
UNLOCK
}
data class ExpectedSoftLocks(val states: Set<StateRef>, val queryCriteria: QueryCriteria.SoftLockingType)
/**
* If [lockId] is set to null, it will be populated with the flowId within the flow.
*/
data class SoftLockAction(val action: SoftLockingAction,
var lockId: UUID?,
val states: Set<StateRef>,
val expectedSoftLocks: ExpectedSoftLocks,
val expectedSoftLockedStates: Set<StateRef>,
val exception: Exception? = null,
val doCheckpoint: Boolean = false)
internal class LockingUnlockingFlow(private val softLockActions: Array<SoftLockAction>): FlowLogic<Boolean>() {
companion object {
var throwOnlyOnce = true
}
@Suspendable
override fun call(): Boolean {
for (softLockAction in softLockActions) {
if (softLockAction.lockId == null) { softLockAction.lockId = stateMachine.id.uuid }
when (softLockAction.action) {
SoftLockingAction.LOCK -> {
serviceHub.vaultService.softLockReserve(softLockAction.lockId!!, NonEmptySet.copyOf(softLockAction.states))
// We checkpoint here so that, upon retrying to assert state after reserving
if (softLockAction.doCheckpoint) {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
assertEquals(softLockAction.expectedSoftLocks.states, queryCashStates(softLockAction.expectedSoftLocks.queryCriteria, serviceHub.vaultService))
assertEquals(softLockAction.expectedSoftLockedStates, (stateMachine as? FlowStateMachineImpl<*>)!!.softLockedStates)
}
SoftLockingAction.UNLOCK -> {
serviceHub.vaultService.softLockRelease(softLockAction.lockId!!, NonEmptySet.copyOf(softLockAction.states))
assertEquals(softLockAction.expectedSoftLocks.states, queryCashStates(softLockAction.expectedSoftLocks.queryCriteria, serviceHub.vaultService))
assertEquals(softLockAction.expectedSoftLockedStates, (stateMachine as? FlowStateMachineImpl<*>)!!.softLockedStates)
}
}
softLockAction.exception?.let {
if (throwOnlyOnce) {
throwOnlyOnce = false
throw it
}
}
}
return true
}
}
internal class CreateFungibleStateFLow : FlowLogic<Unit>() {
companion object {
var hook: ((VaultService) -> Unit)? = null
}
@Suspendable
override fun call() {
val issuer = serviceHub.myInfo.legalIdentities.first()
val notary = serviceHub.networkMapCache.notaryIdentities[0]
val fungibleState = NodeVaultServiceTest.FungibleFoo(100.DOLLARS, listOf(issuer))
val txCommand = Command(DummyContract.Commands.Create(), issuer.owningKey)
val txBuilder = TransactionBuilder(notary)
.addOutputState(fungibleState, DummyContract.PROGRAM_ID)
.addCommand(txCommand)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
serviceHub.recordTransactions(signedTx)
hook?.invoke(serviceHub.vaultService)
}
}

View File

@ -406,6 +406,47 @@ class NodeVaultServiceTest {
} }
} }
@Test(timeout=300_000)
fun `softLockRelease - correctly releases n locked states`() {
fun queryStates(softLockingType: SoftLockingType) =
vaultService.queryBy<Cash.State>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(softLockingType))).states
database.transaction {
vaultFiller.fillWithSomeTestCash(100.DOLLARS, issuerServices, 100, DUMMY_CASH_ISSUER)
}
val softLockId = UUID.randomUUID()
val lockCount = NodeVaultService.MAX_SQL_IN_CLAUSE_SET * 2
database.transaction {
assertEquals(100, queryStates(SoftLockingType.UNLOCKED_ONLY).size)
val unconsumedStates = vaultService.queryBy<Cash.State>().states
val lockSet = mutableListOf<StateRef>()
for (i in 0 until lockCount) {
lockSet.add(unconsumedStates[i].ref)
}
vaultService.softLockReserve(softLockId, NonEmptySet.copyOf(lockSet))
assertEquals(lockCount, queryStates(SoftLockingType.LOCKED_ONLY).size)
val unlockSet0 = mutableSetOf<StateRef>()
for (i in 0 until NodeVaultService.MAX_SQL_IN_CLAUSE_SET + 1) {
unlockSet0.add(lockSet[i])
}
vaultService.softLockRelease(softLockId, NonEmptySet.copyOf(unlockSet0))
assertEquals(NodeVaultService.MAX_SQL_IN_CLAUSE_SET - 1, queryStates(SoftLockingType.LOCKED_ONLY).size)
val unlockSet1 = mutableSetOf<StateRef>()
for (i in NodeVaultService.MAX_SQL_IN_CLAUSE_SET + 1 until NodeVaultService.MAX_SQL_IN_CLAUSE_SET + 3) {
unlockSet1.add(lockSet[i])
}
vaultService.softLockRelease(softLockId, NonEmptySet.copyOf(unlockSet1))
assertEquals(NodeVaultService.MAX_SQL_IN_CLAUSE_SET - 1 - 2, queryStates(SoftLockingType.LOCKED_ONLY).size)
vaultService.softLockRelease(softLockId) // release the rest
assertEquals(100, queryStates(SoftLockingType.UNLOCKED_ONLY).size)
}
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `unconsumedStatesForSpending exact amount`() { fun `unconsumedStatesForSpending exact amount`() {
database.transaction { database.transaction {