mirror of
https://github.com/corda/corda.git
synced 2025-05-30 14:14:29 +00:00
[CORDA-824]: fix resource leak in Cash selection (#2155)
[CORDA-824]: fix resource leak in Cash selection and some example class
This commit is contained in:
parent
b84fdd3ffc
commit
5a6f2a19b3
@ -24,6 +24,7 @@ object CustomVaultQuery {
|
|||||||
private companion object {
|
private companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun rebalanceCurrencyReserves(): List<Amount<Currency>> {
|
fun rebalanceCurrencyReserves(): List<Amount<Currency>> {
|
||||||
val nativeQuery = """
|
val nativeQuery = """
|
||||||
select
|
select
|
||||||
@ -44,16 +45,18 @@ object CustomVaultQuery {
|
|||||||
"""
|
"""
|
||||||
log.info("SQL to execute: $nativeQuery")
|
log.info("SQL to execute: $nativeQuery")
|
||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val prepStatement = session.prepareStatement(nativeQuery)
|
return session.prepareStatement(nativeQuery).use { prepStatement ->
|
||||||
val rs = prepStatement.executeQuery()
|
prepStatement.executeQuery().use { rs ->
|
||||||
val topUpLimits: MutableList<Amount<Currency>> = mutableListOf()
|
val topUpLimits: MutableList<Amount<Currency>> = mutableListOf()
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
val currencyStr = rs.getString(1)
|
val currencyStr = rs.getString(1)
|
||||||
val amount = rs.getLong(2)
|
val amount = rs.getLong(2)
|
||||||
log.info("$currencyStr : $amount")
|
log.info("$currencyStr : $amount")
|
||||||
topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr)))
|
topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr)))
|
||||||
|
}
|
||||||
|
topUpLimits
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return topUpLimits
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -69,6 +72,7 @@ object TopupIssuerFlow {
|
|||||||
data class TopupRequest(val issueToParty: Party,
|
data class TopupRequest(val issueToParty: Party,
|
||||||
val issuerPartyRef: OpaqueBytes,
|
val issuerPartyRef: OpaqueBytes,
|
||||||
val notaryParty: Party)
|
val notaryParty: Party)
|
||||||
|
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
@StartableByRPC
|
@StartableByRPC
|
||||||
class TopupIssuanceRequester(val issueToParty: Party,
|
class TopupIssuanceRequester(val issueToParty: Party,
|
||||||
|
@ -69,13 +69,14 @@ abstract class AbstractCashSelection {
|
|||||||
* with this notary are included.
|
* with this notary are included.
|
||||||
* @param onlyFromIssuerParties Optional issuer parties to match against.
|
* @param onlyFromIssuerParties Optional issuer parties to match against.
|
||||||
* @param withIssuerRefs Optional issuer references to match against.
|
* @param withIssuerRefs Optional issuer references to match against.
|
||||||
* @return JDBC ResultSet with the matching states that were found. If sufficient funds were found these will be locked,
|
* @param withResultSet Function that contains the business logic. The JDBC ResultSet with the matching states that were found. If sufficient funds were found these will be locked,
|
||||||
* otherwise what is available is returned unlocked for informational purposes.
|
* otherwise what is available is returned unlocked for informational purposes.
|
||||||
|
* @return The result of the withResultSet function
|
||||||
*/
|
*/
|
||||||
abstract fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
abstract fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
||||||
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet
|
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean
|
||||||
|
|
||||||
override abstract fun toString() : String
|
override abstract fun toString(): String
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query to gather Cash states that are available and retry if they are temporarily unavailable.
|
* Query to gather Cash states that are available and retry if they are temporarily unavailable.
|
||||||
@ -124,34 +125,40 @@ abstract class AbstractCashSelection {
|
|||||||
try {
|
try {
|
||||||
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||||
// the softLockReserve update will detect whether we try to lock states locked by others
|
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||||
val rs = executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs)
|
return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs ->
|
||||||
stateAndRefs.clear()
|
stateAndRefs.clear()
|
||||||
|
|
||||||
var totalPennies = 0L
|
var totalPennies = 0L
|
||||||
val stateRefs = mutableSetOf<StateRef>()
|
val stateRefs = mutableSetOf<StateRef>()
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
val txHash = SecureHash.parse(rs.getString(1))
|
val txHash = SecureHash.parse(rs.getString(1))
|
||||||
val index = rs.getInt(2)
|
val index = rs.getInt(2)
|
||||||
val pennies = rs.getLong(3)
|
val pennies = rs.getLong(3)
|
||||||
totalPennies = rs.getLong(4)
|
totalPennies = rs.getLong(4)
|
||||||
val rowLockId = rs.getString(5)
|
val rowLockId = rs.getString(5)
|
||||||
stateRefs.add(StateRef(txHash, index))
|
stateRefs.add(StateRef(txHash, index))
|
||||||
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stateRefs.isNotEmpty()) {
|
||||||
|
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
||||||
|
stateAndRefs.addAll(services.loadStates(stateRefs) as Collection<StateAndRef<Cash.State>>)
|
||||||
|
}
|
||||||
|
|
||||||
|
val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity
|
||||||
|
if (success) {
|
||||||
|
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||||
|
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||||
|
|
||||||
|
// With the current single threaded state machine available states are guaranteed to lock.
|
||||||
|
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||||
|
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||||
|
} else {
|
||||||
|
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||||
|
}
|
||||||
|
success
|
||||||
}
|
}
|
||||||
if (stateRefs.isNotEmpty())
|
|
||||||
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
|
||||||
stateAndRefs.addAll(services.loadStates(stateRefs) as Collection<StateAndRef<Cash.State>>)
|
|
||||||
|
|
||||||
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
|
|
||||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
|
||||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
|
||||||
|
|
||||||
// With the current single threaded state machine available states are guaranteed to lock.
|
|
||||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
|
||||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
|
||||||
// retry as more states may become available
|
// retry as more states may become available
|
||||||
} catch (e: SQLException) {
|
} catch (e: SQLException) {
|
||||||
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
||||||
|
@ -30,9 +30,8 @@ class CashSelectionH2Impl : AbstractCashSelection() {
|
|||||||
// 2) H2 uses session variables to perform this accumulator function:
|
// 2) H2 uses session variables to perform this accumulator function:
|
||||||
// http://www.h2database.com/html/functions.html#set
|
// http://www.h2database.com/html/functions.html#set
|
||||||
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
||||||
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean {
|
||||||
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet {
|
connection.createStatement().use { it.execute("CALL SET(@t, CAST(0 AS BIGINT));") }
|
||||||
connection.createStatement().execute("CALL SET(@t, CAST(0 AS BIGINT));")
|
|
||||||
|
|
||||||
val selectJoin = """
|
val selectJoin = """
|
||||||
SELECT vs.transaction_id, vs.output_index, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
SELECT vs.transaction_id, vs.output_index, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
||||||
@ -50,19 +49,22 @@ class CashSelectionH2Impl : AbstractCashSelection() {
|
|||||||
" AND ccs.issuer_ref IN (?)" else "")
|
" AND ccs.issuer_ref IN (?)" else "")
|
||||||
|
|
||||||
// Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection)
|
// Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection)
|
||||||
val psSelectJoin = connection.prepareStatement(selectJoin)
|
connection.prepareStatement(selectJoin).use { psSelectJoin ->
|
||||||
var pIndex = 0
|
var pIndex = 0
|
||||||
psSelectJoin.setString(++pIndex, amount.token.currencyCode)
|
psSelectJoin.setString(++pIndex, amount.token.currencyCode)
|
||||||
psSelectJoin.setLong(++pIndex, amount.quantity)
|
psSelectJoin.setLong(++pIndex, amount.quantity)
|
||||||
psSelectJoin.setString(++pIndex, lockId.toString())
|
psSelectJoin.setString(++pIndex, lockId.toString())
|
||||||
if (notary != null)
|
if (notary != null)
|
||||||
psSelectJoin.setString(++pIndex, notary.name.toString())
|
psSelectJoin.setString(++pIndex, notary.name.toString())
|
||||||
if (onlyFromIssuerParties.isNotEmpty())
|
if (onlyFromIssuerParties.isNotEmpty())
|
||||||
psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toStringShort() as Any}.toTypedArray() )
|
psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toStringShort() as Any }.toTypedArray())
|
||||||
if (withIssuerRefs.isNotEmpty())
|
if (withIssuerRefs.isNotEmpty())
|
||||||
psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes as Any }.toTypedArray())
|
psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes as Any }.toTypedArray())
|
||||||
log.debug { psSelectJoin.toString() }
|
log.debug { psSelectJoin.toString() }
|
||||||
|
|
||||||
return psSelectJoin.executeQuery()
|
psSelectJoin.executeQuery().use { rs ->
|
||||||
|
return withResultSet(rs)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -19,7 +19,7 @@ class CashSelectionMySQLImpl : AbstractCashSelection() {
|
|||||||
return metadata.driverName == JDBC_DRIVER_NAME
|
return metadata.driverName == JDBC_DRIVER_NAME
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun executeQuery(statement: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, issuerKeysStr: Set<AbstractParty>, issuerRefsStr: Set<OpaqueBytes>): ResultSet {
|
override fun executeQuery(statement: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, issuerKeysStr: Set<AbstractParty>, issuerRefsStr: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean {
|
||||||
TODO("MySQL cash selection not implemented")
|
TODO("MySQL cash selection not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,8 +27,7 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
|||||||
// 2) The window function accumulated column (`total`) does not include the current row (starts from 0) and cannot
|
// 2) The window function accumulated column (`total`) does not include the current row (starts from 0) and cannot
|
||||||
// appear in the WHERE clause, hence restricting row selection and adjusting the returned total in the outer query.
|
// appear in the WHERE clause, hence restricting row selection and adjusting the returned total in the outer query.
|
||||||
// 3) Currently (version 9.6), FOR UPDATE cannot be specified with window functions
|
// 3) Currently (version 9.6), FOR UPDATE cannot be specified with window functions
|
||||||
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean {
|
||||||
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet {
|
|
||||||
val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.pennies,
|
val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.pennies,
|
||||||
nested.total+nested.pennies as total_pennies, nested.lock_id
|
nested.total+nested.pennies as total_pennies, nested.lock_id
|
||||||
FROM
|
FROM
|
||||||
@ -51,29 +50,32 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
|||||||
nested WHERE nested.total < ?
|
nested WHERE nested.total < ?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
val statement = connection.prepareStatement(selectJoin)
|
connection.prepareStatement(selectJoin).use { statement ->
|
||||||
statement.setString(1, amount.token.toString())
|
statement.setString(1, amount.token.toString())
|
||||||
statement.setString(2, lockId.toString())
|
statement.setString(2, lockId.toString())
|
||||||
var paramOffset = 0
|
var paramOffset = 0
|
||||||
if (notary != null) {
|
if (notary != null) {
|
||||||
statement.setString(3, notary.name.toString())
|
statement.setString(3, notary.name.toString())
|
||||||
paramOffset += 1
|
paramOffset += 1
|
||||||
}
|
}
|
||||||
if (onlyFromIssuerParties.isNotEmpty()) {
|
if (onlyFromIssuerParties.isNotEmpty()) {
|
||||||
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
|
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
|
||||||
{ it.owningKey.toBase58String() }.toTypedArray())
|
{ it.owningKey.toBase58String() }.toTypedArray())
|
||||||
statement.setArray(3 + paramOffset, issuerKeys)
|
statement.setArray(3 + paramOffset, issuerKeys)
|
||||||
paramOffset += 1
|
paramOffset += 1
|
||||||
}
|
}
|
||||||
if (withIssuerRefs.isNotEmpty()) {
|
if (withIssuerRefs.isNotEmpty()) {
|
||||||
val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map
|
val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map
|
||||||
{ it.bytes }.toTypedArray())
|
{ it.bytes }.toTypedArray())
|
||||||
statement.setArray(3 + paramOffset, issuerRefs)
|
statement.setArray(3 + paramOffset, issuerRefs)
|
||||||
paramOffset += 1
|
paramOffset += 1
|
||||||
}
|
}
|
||||||
statement.setLong(3 + paramOffset, amount.quantity)
|
statement.setLong(3 + paramOffset, amount.quantity)
|
||||||
log.debug { statement.toString() }
|
log.debug { statement.toString() }
|
||||||
|
|
||||||
return statement.executeQuery()
|
statement.executeQuery().use { rs ->
|
||||||
|
return withResultSet(rs)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user