From 5a6f2a19b3511956c75846235f189f14ba7b3d4c Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Thu, 30 Nov 2017 16:17:18 +0000 Subject: [PATCH] [CORDA-824]: fix resource leak in Cash selection (#2155) [CORDA-824]: fix resource leak in Cash selection and some example class --- .../kotlin/net/corda/docs/CustomVaultQuery.kt | 22 ++++--- .../cash/selection/AbstractCashSelection.kt | 63 ++++++++++--------- .../cash/selection/CashSelectionH2Impl.kt | 34 +++++----- .../cash/selection/CashSelectionMySQLImpl.kt | 2 +- .../selection/CashSelectionPostgreSQLImpl.kt | 52 +++++++-------- 5 files changed, 94 insertions(+), 79 deletions(-) diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt index 506dc8bc02..ac592eaa8f 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt @@ -24,6 +24,7 @@ object CustomVaultQuery { private companion object { private val log = contextLogger() } + fun rebalanceCurrencyReserves(): List> { val nativeQuery = """ select @@ -44,16 +45,18 @@ object CustomVaultQuery { """ log.info("SQL to execute: $nativeQuery") val session = services.jdbcSession() - val prepStatement = session.prepareStatement(nativeQuery) - val rs = prepStatement.executeQuery() - val topUpLimits: MutableList> = mutableListOf() - while (rs.next()) { - val currencyStr = rs.getString(1) - val amount = rs.getLong(2) - log.info("$currencyStr : $amount") - topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr))) + return session.prepareStatement(nativeQuery).use { prepStatement -> + prepStatement.executeQuery().use { rs -> + val topUpLimits: MutableList> = mutableListOf() + while (rs.next()) { + val currencyStr = rs.getString(1) + val amount = rs.getLong(2) + log.info("$currencyStr : $amount") + topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr))) + } + topUpLimits + } } - return topUpLimits } } } @@ -69,6 +72,7 @@ object TopupIssuerFlow { data class TopupRequest(val issueToParty: Party, val issuerPartyRef: OpaqueBytes, val notaryParty: Party) + @InitiatingFlow @StartableByRPC class TopupIssuanceRequester(val issueToParty: Party, diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt index b5d2113f8d..eff93f6d3f 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt @@ -69,13 +69,14 @@ abstract class AbstractCashSelection { * with this notary are included. * @param onlyFromIssuerParties Optional issuer parties to match against. * @param withIssuerRefs Optional issuer references to match against. - * @return JDBC ResultSet with the matching states that were found. If sufficient funds were found these will be locked, + * @param withResultSet Function that contains the business logic. The JDBC ResultSet with the matching states that were found. If sufficient funds were found these will be locked, * otherwise what is available is returned unlocked for informational purposes. + * @return The result of the withResultSet function */ abstract fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, - onlyFromIssuerParties: Set, withIssuerRefs: Set) : ResultSet + onlyFromIssuerParties: Set, withIssuerRefs: Set, withResultSet: (ResultSet) -> Boolean): Boolean - override abstract fun toString() : String + override abstract fun toString(): String /** * Query to gather Cash states that are available and retry if they are temporarily unavailable. @@ -124,34 +125,40 @@ abstract class AbstractCashSelection { try { // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) // the softLockReserve update will detect whether we try to lock states locked by others - val rs = executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) - stateAndRefs.clear() + return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs -> + stateAndRefs.clear() - var totalPennies = 0L - val stateRefs = mutableSetOf() - while (rs.next()) { - val txHash = SecureHash.parse(rs.getString(1)) - val index = rs.getInt(2) - val pennies = rs.getLong(3) - totalPennies = rs.getLong(4) - val rowLockId = rs.getString(5) - stateRefs.add(StateRef(txHash, index)) - log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" } + var totalPennies = 0L + val stateRefs = mutableSetOf() + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val pennies = rs.getLong(3) + totalPennies = rs.getLong(4) + val rowLockId = rs.getString(5) + stateRefs.add(StateRef(txHash, index)) + log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" } + } + + if (stateRefs.isNotEmpty()) { + // TODO: future implementation to retrieve contract states from a Vault BLOB store + stateAndRefs.addAll(services.loadStates(stateRefs) as Collection>) + } + + val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity + if (success) { + // we should have a minimum number of states to satisfy our selection `amount` criteria + log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") + + // With the current single threaded state machine available states are guaranteed to lock. + // TODO However, we will have to revisit these methods in the future multi-threaded. + services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) + } else { + log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") + } + success } - if (stateRefs.isNotEmpty()) - // TODO: future implementation to retrieve contract states from a Vault BLOB store - stateAndRefs.addAll(services.loadStates(stateRefs) as Collection>) - if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) { - // we should have a minimum number of states to satisfy our selection `amount` criteria - log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") - - // With the current single threaded state machine available states are guaranteed to lock. - // TODO However, we will have to revisit these methods in the future multi-threaded. - services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) - return true - } - log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") // retry as more states may become available } catch (e: SQLException) { log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId] diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt index 440047f19d..3c21246dee 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt @@ -30,9 +30,8 @@ class CashSelectionH2Impl : AbstractCashSelection() { // 2) H2 uses session variables to perform this accumulator function: // http://www.h2database.com/html/functions.html#set // 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries) - override fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, - onlyFromIssuerParties: Set, withIssuerRefs: Set) : ResultSet { - connection.createStatement().execute("CALL SET(@t, CAST(0 AS BIGINT));") + override fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set, withIssuerRefs: Set, withResultSet: (ResultSet) -> Boolean): Boolean { + connection.createStatement().use { it.execute("CALL SET(@t, CAST(0 AS BIGINT));") } val selectJoin = """ SELECT vs.transaction_id, vs.output_index, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id @@ -50,19 +49,22 @@ class CashSelectionH2Impl : AbstractCashSelection() { " AND ccs.issuer_ref IN (?)" else "") // Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection) - val psSelectJoin = connection.prepareStatement(selectJoin) - var pIndex = 0 - psSelectJoin.setString(++pIndex, amount.token.currencyCode) - psSelectJoin.setLong(++pIndex, amount.quantity) - psSelectJoin.setString(++pIndex, lockId.toString()) - if (notary != null) - psSelectJoin.setString(++pIndex, notary.name.toString()) - if (onlyFromIssuerParties.isNotEmpty()) - psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toStringShort() as Any}.toTypedArray() ) - if (withIssuerRefs.isNotEmpty()) - psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes as Any }.toTypedArray()) - log.debug { psSelectJoin.toString() } + connection.prepareStatement(selectJoin).use { psSelectJoin -> + var pIndex = 0 + psSelectJoin.setString(++pIndex, amount.token.currencyCode) + psSelectJoin.setLong(++pIndex, amount.quantity) + psSelectJoin.setString(++pIndex, lockId.toString()) + if (notary != null) + psSelectJoin.setString(++pIndex, notary.name.toString()) + if (onlyFromIssuerParties.isNotEmpty()) + psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toStringShort() as Any }.toTypedArray()) + if (withIssuerRefs.isNotEmpty()) + psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes as Any }.toTypedArray()) + log.debug { psSelectJoin.toString() } - return psSelectJoin.executeQuery() + psSelectJoin.executeQuery().use { rs -> + return withResultSet(rs) + } + } } } \ No newline at end of file diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt index 853ba23d07..e197e5a962 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt @@ -19,7 +19,7 @@ class CashSelectionMySQLImpl : AbstractCashSelection() { return metadata.driverName == JDBC_DRIVER_NAME } - override fun executeQuery(statement: Connection, amount: Amount, lockId: UUID, notary: Party?, issuerKeysStr: Set, issuerRefsStr: Set): ResultSet { + override fun executeQuery(statement: Connection, amount: Amount, lockId: UUID, notary: Party?, issuerKeysStr: Set, issuerRefsStr: Set, withResultSet: (ResultSet) -> Boolean): Boolean { TODO("MySQL cash selection not implemented") } diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt index f96ef6f441..47e59386c3 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt @@ -27,8 +27,7 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { // 2) The window function accumulated column (`total`) does not include the current row (starts from 0) and cannot // appear in the WHERE clause, hence restricting row selection and adjusting the returned total in the outer query. // 3) Currently (version 9.6), FOR UPDATE cannot be specified with window functions - override fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, - onlyFromIssuerParties: Set, withIssuerRefs: Set) : ResultSet { + override fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set, withIssuerRefs: Set, withResultSet: (ResultSet) -> Boolean): Boolean { val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.pennies, nested.total+nested.pennies as total_pennies, nested.lock_id FROM @@ -51,29 +50,32 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { nested WHERE nested.total < ? """ - val statement = connection.prepareStatement(selectJoin) - statement.setString(1, amount.token.toString()) - statement.setString(2, lockId.toString()) - var paramOffset = 0 - if (notary != null) { - statement.setString(3, notary.name.toString()) - paramOffset += 1 - } - if (onlyFromIssuerParties.isNotEmpty()) { - val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map - { it.owningKey.toBase58String() }.toTypedArray()) - statement.setArray(3 + paramOffset, issuerKeys) - paramOffset += 1 - } - if (withIssuerRefs.isNotEmpty()) { - val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map - { it.bytes }.toTypedArray()) - statement.setArray(3 + paramOffset, issuerRefs) - paramOffset += 1 - } - statement.setLong(3 + paramOffset, amount.quantity) - log.debug { statement.toString() } + connection.prepareStatement(selectJoin).use { statement -> + statement.setString(1, amount.token.toString()) + statement.setString(2, lockId.toString()) + var paramOffset = 0 + if (notary != null) { + statement.setString(3, notary.name.toString()) + paramOffset += 1 + } + if (onlyFromIssuerParties.isNotEmpty()) { + val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map + { it.owningKey.toBase58String() }.toTypedArray()) + statement.setArray(3 + paramOffset, issuerKeys) + paramOffset += 1 + } + if (withIssuerRefs.isNotEmpty()) { + val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map + { it.bytes }.toTypedArray()) + statement.setArray(3 + paramOffset, issuerRefs) + paramOffset += 1 + } + statement.setLong(3 + paramOffset, amount.quantity) + log.debug { statement.toString() } - return statement.executeQuery() + statement.executeQuery().use { rs -> + return withResultSet(rs) + } + } } }