mirror of
https://github.com/corda/corda.git
synced 2025-06-10 19:31:46 +00:00
Experimental support for PostgreSQL (#1525)
* Cash selection refactoring such that 3d party DB providers are only required to implement Coin Selection SQL logic. * Re-added debug logging statement. * Updated to include PR review feedback from VK * Refactoring following rebase from master. * Fix broken JUnits following rebase. * Use JDBC ResultSet getBlob() and added custom serializer to address concern raised by tomtau in PR. * Fix failing JUnits. * Experimental support for PostgreSQL: CashSelection done using window functions * Moved postgresql version information into corda/build.gradle * Using a PreparedStatement in CashSelectionPostgreSQLImpl * Changed the PostgreSQL Cash Selection implementation to use the new refactored AbstractCashSelection
This commit is contained in:
parent
2c84d07e8e
commit
342090db62
@ -41,6 +41,7 @@ buildscript {
|
|||||||
ext.jansi_version = '1.14'
|
ext.jansi_version = '1.14'
|
||||||
ext.hibernate_version = '5.2.6.Final'
|
ext.hibernate_version = '5.2.6.Final'
|
||||||
ext.h2_version = '1.4.194' // Update docs if renamed or removed.
|
ext.h2_version = '1.4.194' // Update docs if renamed or removed.
|
||||||
|
ext.postgresql_version = '42.1.4'
|
||||||
ext.rxjava_version = '1.2.4'
|
ext.rxjava_version = '1.2.4'
|
||||||
ext.dokka_version = '0.9.14'
|
ext.dokka_version = '0.9.14'
|
||||||
ext.eddsa_version = '0.2.0'
|
ext.eddsa_version = '0.2.0'
|
||||||
|
@ -9,6 +9,8 @@ UNRELEASED
|
|||||||
* ``OpaqueBytes.bytes`` now returns a clone of its underlying ``ByteArray``, and has been redeclared as ``final``.
|
* ``OpaqueBytes.bytes`` now returns a clone of its underlying ``ByteArray``, and has been redeclared as ``final``.
|
||||||
This is a minor change to the public API, but is required to ensure that classes like ``SecureHash`` are immutable.
|
This is a minor change to the public API, but is required to ensure that classes like ``SecureHash`` are immutable.
|
||||||
|
|
||||||
|
* Experimental support for PostgreSQL: CashSelection done using window functions
|
||||||
|
|
||||||
* ``FlowLogic`` now exposes a series of function called ``receiveAll(...)`` allowing to join ``receive(...)`` instructions.
|
* ``FlowLogic`` now exposes a series of function called ``receiveAll(...)`` allowing to join ``receive(...)`` instructions.
|
||||||
|
|
||||||
* Renamed "plugins" directory on nodes to "cordapps"
|
* Renamed "plugins" directory on nodes to "cordapps"
|
||||||
|
@ -0,0 +1,83 @@
|
|||||||
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
|
import net.corda.core.contracts.Amount
|
||||||
|
import net.corda.core.identity.AbstractParty
|
||||||
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
|
import net.corda.core.utilities.debug
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import net.corda.core.utilities.toBase58String
|
||||||
|
import java.sql.Connection
|
||||||
|
import java.sql.DatabaseMetaData
|
||||||
|
import java.sql.ResultSet
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver"
|
||||||
|
val log = loggerFor<CashSelectionPostgreSQLImpl>()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun isCompatible(metadata: DatabaseMetaData): Boolean {
|
||||||
|
return metadata.driverName == JDBC_DRIVER_NAME
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME"
|
||||||
|
|
||||||
|
// This is using PostgreSQL window functions for selecting a minimum set of rows that match a request amount of coins:
|
||||||
|
// 1) This may also be possible with user-defined functions (e.g. using PL/pgSQL)
|
||||||
|
// 2) The window function accumulated column (`total`) does not include the current row (starts from 0) and cannot
|
||||||
|
// appear in the WHERE clause, hence restricting row selection and adjusting the returned total in the outer query.
|
||||||
|
// 3) Currently (version 9.6), FOR UPDATE cannot be specified with window functions
|
||||||
|
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
||||||
|
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet {
|
||||||
|
val selectJoin = """SELECT nested.transaction_id, nested.output_index, nested.contract_state, nested.pennies,
|
||||||
|
nested.total+nested.pennies as total_pennies, nested.lock_id
|
||||||
|
FROM
|
||||||
|
(SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies,
|
||||||
|
coalesce((SUM(ccs.pennies) OVER (PARTITION BY 1 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)), 0)
|
||||||
|
AS total, vs.lock_id
|
||||||
|
FROM vault_states AS vs, contract_cash_states AS ccs
|
||||||
|
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
|
||||||
|
AND vs.state_status = 0
|
||||||
|
AND ccs.ccy_code = ?
|
||||||
|
AND (vs.lock_id = ? OR vs.lock_id is null)
|
||||||
|
""" +
|
||||||
|
(if (notary != null)
|
||||||
|
" AND vs.notary_name = ?" else "") +
|
||||||
|
(if (onlyFromIssuerParties.isNotEmpty())
|
||||||
|
" AND ccs.issuer_key = ANY (?)" else "") +
|
||||||
|
(if (withIssuerRefs.isNotEmpty())
|
||||||
|
" AND ccs.issuer_ref = ANY (?)" else "") +
|
||||||
|
""")
|
||||||
|
nested WHERE nested.total < ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
val statement = connection.prepareStatement(selectJoin)
|
||||||
|
statement.setString(1, amount.token.toString())
|
||||||
|
statement.setString(2, lockId.toString())
|
||||||
|
var paramOffset = 0
|
||||||
|
if (notary != null) {
|
||||||
|
statement.setString(3, notary.name.toString())
|
||||||
|
paramOffset += 1
|
||||||
|
}
|
||||||
|
if (onlyFromIssuerParties.isNotEmpty()) {
|
||||||
|
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
|
||||||
|
{ it.owningKey.toBase58String() }.toTypedArray())
|
||||||
|
statement.setArray(3 + paramOffset, issuerKeys)
|
||||||
|
paramOffset += 1
|
||||||
|
}
|
||||||
|
if (withIssuerRefs.isNotEmpty()) {
|
||||||
|
val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map
|
||||||
|
{ it.bytes }.toTypedArray())
|
||||||
|
statement.setArray(3 + paramOffset, issuerRefs)
|
||||||
|
paramOffset += 1
|
||||||
|
}
|
||||||
|
statement.setLong(3 + paramOffset, amount.quantity)
|
||||||
|
log.debug { statement.toString() }
|
||||||
|
|
||||||
|
return statement.executeQuery()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,2 +1,3 @@
|
|||||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
|
net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
|
||||||
net.corda.finance.contracts.asset.cash.selection.CashSelectionMySQLImpl
|
net.corda.finance.contracts.asset.cash.selection.CashSelectionMySQLImpl
|
||||||
|
net.corda.finance.contracts.asset.cash.selection.CashSelectionPostgreSQLImpl
|
@ -137,6 +137,9 @@ dependencies {
|
|||||||
// For H2 database support in persistence
|
// For H2 database support in persistence
|
||||||
compile "com.h2database:h2:$h2_version"
|
compile "com.h2database:h2:$h2_version"
|
||||||
|
|
||||||
|
// For Postgres database support in persistence
|
||||||
|
compile "org.postgresql:postgresql:$postgresql_version"
|
||||||
|
|
||||||
// SQL connection pooling library
|
// SQL connection pooling library
|
||||||
compile "com.zaxxer:HikariCP:2.5.1"
|
compile "com.zaxxer:HikariCP:2.5.1"
|
||||||
|
|
||||||
|
@ -93,6 +93,7 @@ class HibernateConfiguration(val schemaService: SchemaService, private val datab
|
|||||||
// during schema creation / update.
|
// during schema creation / update.
|
||||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||||
override fun closeConnection(conn: Connection) {
|
override fun closeConnection(conn: Connection) {
|
||||||
|
conn.autoCommit = false
|
||||||
val tx = DatabaseTransactionManager.current()
|
val tx = DatabaseTransactionManager.current()
|
||||||
tx.commit()
|
tx.commit()
|
||||||
tx.close()
|
tx.close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user