mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Upgrade to Requery 1.2.1 with Composite Key support (#443)
* Test SELECT WHERE IN composite key using requery 1.2.0 Upgraded Vault Service code to use Requery 1.2.0 SELECT .. WHERE IN Updated generated schema code with Requery 1.2.0 Upgrade to Requery 1.2.1 Upgrade to Requery 1.2.1 - converted to use update DSL with composite key Removed redundant JDBC SQL test cases. Minor updates following PR review comments from RP. * Streamline companion object initialisation.
This commit is contained in:
@ -2,11 +2,13 @@ package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import io.requery.PersistenceException
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.kotlin.`in`
|
||||
import io.requery.kotlin.eq
|
||||
import io.requery.kotlin.isNull
|
||||
import io.requery.kotlin.notNull
|
||||
import io.requery.query.RowExpression
|
||||
import net.corda.contracts.asset.Cash
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
@ -32,14 +34,11 @@ import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.vault.schemas.*
|
||||
import net.corda.node.utilities.StrandLocalTransactionManager
|
||||
import net.corda.node.utilities.bufferUntilDatabaseCommit
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
@ -60,6 +59,9 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
|
||||
private companion object {
|
||||
val log = loggerFor<NodeVaultService>()
|
||||
|
||||
// Define composite primary key used in Requery Expression
|
||||
val stateRefCompositeColumn : RowExpression = RowExpression.of(listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX))
|
||||
}
|
||||
|
||||
val configuration = RequeryConfiguration(dataSourceProperties)
|
||||
@ -248,83 +250,72 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
}
|
||||
|
||||
@Throws(StatesNotAvailableException::class)
|
||||
override fun softLockReserve(id: UUID, stateRefs: Set<StateRef>) {
|
||||
override fun softLockReserve(lockId: UUID, stateRefs: Set<StateRef>) {
|
||||
if (stateRefs.isNotEmpty()) {
|
||||
val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList())
|
||||
val softLockTimestamp = services.clock.instant()
|
||||
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
|
||||
val updateStatement = """
|
||||
UPDATE VAULT_STATES SET lock_id = '$id', lock_timestamp = '$softLockTimestamp'
|
||||
WHERE ((transaction_id, output_index) IN ($stateRefsAsStr))
|
||||
AND (state_status = 0)
|
||||
AND ((lock_id = '$id') OR (lock_id is null));
|
||||
"""
|
||||
val statement = configuration.jdbcSession().createStatement()
|
||||
log.debug(updateStatement)
|
||||
val stateRefArgs = stateRefArgs(stateRefs)
|
||||
try {
|
||||
val rs = statement.executeUpdate(updateStatement)
|
||||
if (rs > 0 && rs == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $id: $stateRefs")
|
||||
}
|
||||
else {
|
||||
// revert partial soft locks
|
||||
val revertUpdateStatement = """
|
||||
UPDATE VAULT_STATES SET lock_id = null
|
||||
WHERE ((transaction_id, output_index) IN ($stateRefsAsStr))
|
||||
AND (lock_timestamp = '$softLockTimestamp') AND (lock_id = '$id');
|
||||
"""
|
||||
log.debug(revertUpdateStatement)
|
||||
val rsr = statement.executeUpdate(revertUpdateStatement)
|
||||
if (rsr > 0) {
|
||||
log.trace("Reverting $rsr partially soft locked states for $id")
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, lockId.toString())
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, softLockTimestamp)
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and((VaultStatesEntity.LOCK_ID eq lockId.toString()) or (VaultStatesEntity.LOCK_ID.isNull()))
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (updatedRows > 0 && updatedRows == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $lockId: $stateRefs")
|
||||
} else {
|
||||
// revert partial soft locks
|
||||
val revertUpdatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.where(VaultStatesEntity.LOCK_UPDATE_TIME eq softLockTimestamp)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (revertUpdatedRows > 0) {
|
||||
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $id but only $rs rows available")
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
log.error("""soft lock update error attempting to reserve states for $lockId and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
if (e.cause is StatesNotAvailableException) throw (e.cause as StatesNotAvailableException)
|
||||
}
|
||||
catch (e: SQLException) {
|
||||
log.error("""soft lock update error attempting to reserve states: $stateRefs for $id
|
||||
$e.
|
||||
""")
|
||||
throw StatesNotAvailableException("Failed to reserve $stateRefs for $id", e)
|
||||
}
|
||||
finally { statement.close() }
|
||||
}
|
||||
}
|
||||
|
||||
override fun softLockRelease(id: UUID, stateRefs: Set<StateRef>?) {
|
||||
override fun softLockRelease(lockId: UUID, stateRefs: Set<StateRef>?) {
|
||||
if (stateRefs == null) {
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val update = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant())
|
||||
.where (VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and (VaultStatesEntity.LOCK_ID eq id.toString()).get()
|
||||
.and (VaultStatesEntity.LOCK_ID eq lockId.toString()).get()
|
||||
if (update.value() > 0) {
|
||||
log.trace("Releasing ${update.value()} soft locked states for $id")
|
||||
log.trace("Releasing ${update.value()} soft locked states for $lockId")
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (stateRefs.isNotEmpty()) {
|
||||
val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList())
|
||||
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
|
||||
val updateStatement = """
|
||||
UPDATE VAULT_STATES SET lock_id = null, lock_timestamp = '${services.clock.instant()}'
|
||||
WHERE (transaction_id, output_index) IN ($stateRefsAsStr)
|
||||
AND (state_status = 0) AND (lock_id = '$id');
|
||||
"""
|
||||
val statement = configuration.jdbcSession().createStatement()
|
||||
log.debug(updateStatement)
|
||||
try {
|
||||
val rs = statement.executeUpdate(updateStatement)
|
||||
if (rs > 0) {
|
||||
log.trace("Releasing $rs soft locked states for $id and stateRefs $stateRefs")
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs(stateRefs))).get().value()
|
||||
if (updatedRows > 0) {
|
||||
log.trace("Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs")
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
log.error("""soft lock update error attempting to release states for $id and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
} finally {
|
||||
statement.close()
|
||||
} catch (e: PersistenceException) {
|
||||
log.error("""soft lock update error attempting to release states for $lockId and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -568,28 +559,17 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
// Retrieve all unconsumed states for this transaction's inputs
|
||||
val consumedStates = HashSet<StateAndRef<ContractState>>()
|
||||
if (tx.inputs.isNotEmpty()) {
|
||||
val stateRefs = stateRefsToCompositeKeyStr(tx.inputs)
|
||||
// TODO: using native JDBC until requery supports SELECT WHERE COMPOSITE_KEY IN
|
||||
// https://github.com/requery/requery/issues/434
|
||||
val statement = configuration.jdbcSession().createStatement()
|
||||
try {
|
||||
// TODO: upgrade to Requery 1.2.0 and rewrite with Requery DSL (https://github.com/requery/requery/issues/434)
|
||||
val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " +
|
||||
"FROM vault_states " +
|
||||
"WHERE ((transaction_id, output_index) IN ($stateRefs)) " +
|
||||
"AND (state_status = 0)")
|
||||
while (rs.next()) {
|
||||
val txHash = SecureHash.parse(rs.getString(1))
|
||||
val index = rs.getInt(2)
|
||||
val state = rs.getBytes(3).deserialize<TransactionState<ContractState>>(storageKryo())
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val result = select(VaultStatesEntity::class).
|
||||
where (stateRefCompositeColumn.`in`(stateRefArgs(tx.inputs))).
|
||||
and (VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
|
||||
result.get().forEach {
|
||||
val txHash = SecureHash.parse(it.txId)
|
||||
val index = it.index
|
||||
val state = it.contractState.deserialize<TransactionState<ContractState>>(storageKryo())
|
||||
consumedStates.add(StateAndRef(state, StateRef(txHash, index)))
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
log.error("""Failed retrieving state refs for: $stateRefs
|
||||
$e.
|
||||
""")
|
||||
}
|
||||
finally { statement.close() }
|
||||
}
|
||||
|
||||
// Is transaction irrelevant?
|
||||
@ -626,9 +606,9 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to generate a string formatted list of Composite Keys for SQL IN clause
|
||||
* Helper method to generate a string formatted list of Composite Keys for Requery Expression clause
|
||||
*/
|
||||
private fun stateRefsToCompositeKeyStr(stateRefs: List<StateRef>): String {
|
||||
return stateRefs.fold("") { stateRefsAsStr, it -> stateRefsAsStr + "('${it.txhash}','${it.index}')," }.dropLast(1)
|
||||
private fun stateRefArgs(stateRefs: Iterable<StateRef>): List<List<Any>> {
|
||||
return stateRefs.map { listOf("'${it.txhash}'", it.index) }
|
||||
}
|
||||
}
|
@ -4,15 +4,13 @@ import net.corda.contracts.asset.Cash
|
||||
import net.corda.contracts.testing.fillWithSomeTestCash
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.composite
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.node.services.StatesNotAvailableException
|
||||
import net.corda.core.node.services.TxWritableStorageService
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.testing.MEGA_CORP
|
||||
@ -132,12 +130,12 @@ class NodeVaultServiceTest {
|
||||
assertThat(services.vaultService.softLockedStates<Cash.State>(softLockId)).hasSize(2)
|
||||
|
||||
// excluding softlocked states
|
||||
val unlockedStates1 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false)
|
||||
val unlockedStates1 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false).toList()
|
||||
assertThat(unlockedStates1).hasSize(1)
|
||||
|
||||
// soft lock release one of the states explicitly
|
||||
services.vaultService.softLockRelease(softLockId, setOf(unconsumedStates[1].ref))
|
||||
val unlockedStates2 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false)
|
||||
val unlockedStates2 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false).toList()
|
||||
assertThat(unlockedStates2).hasSize(2)
|
||||
|
||||
// soft lock release the rest by id
|
||||
@ -231,7 +229,7 @@ class NodeVaultServiceTest {
|
||||
|
||||
// attempt to lock all 3 states with LockId2
|
||||
databaseTransaction(database) {
|
||||
assertThatExceptionOfType(FlowException::class.java).isThrownBy(
|
||||
assertThatExceptionOfType(StatesNotAvailableException::class.java).isThrownBy(
|
||||
{ vault.softLockReserve(softLockId2, stateRefsToSoftLock) }
|
||||
).withMessageContaining("only 2 rows available").withNoCause()
|
||||
}
|
||||
|
Reference in New Issue
Block a user