Pluggable cash selection using H2 as default implementation. (#1300)

* Pluggable cash selection using H2 as default implementation.

* Refactor to use own CashSelection service loader and associated isCompatible() check.

* Determine Cash Selection algorithm to use based on JDBC Driver loaded.
Lazily load JDBC Driver metadata and class implementation.

* Rebased and adjusted package naming to include `finance`

* Rebased and adjusted package naming to include `finance`

* Added some documentation.
Minor fixes and changes following PR review.

* Return cashSelectionAlgo.
Throw exception rather than setting in atomically referenced object.
This commit is contained in:
josecoll 2017-08-22 18:09:23 +01:00 committed by GitHub
parent dce17ed272
commit 458d2e32c0
7 changed files with 254 additions and 133 deletions

View File

@ -6,6 +6,7 @@ from the previous milestone release.
UNRELEASED
----------
* Cash selection algorithm is now pluggable (with H2 being the default implementation)
* Removed usage of Requery ORM library (repalced with JPA/Hibernate)

View File

@ -23,6 +23,11 @@ Cash shares a common superclass, ``OnLedgerAsset``, with the Commodity contract.
assets which can be issued, moved and exited on chain, with the subclasses handling asset-specific data types and
behaviour.
.. note:: Corda supports a pluggable cash selection algorithm by implementing the ``CashSelection`` interface.
The default implementation uses an H2 specific query that can be overridden for different database providers.
Please see ``CashSelectionH2Impl`` and its associated declaration in
``META-INF\services\net.corda.finance.contracts.asset.CashSelection``
Commodity
---------

View File

@ -2,9 +2,8 @@
package net.corda.finance.contracts.asset
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.testing.NULL_PARTY
import net.corda.core.crypto.toBase58String
@ -12,18 +11,12 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.Emoji
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.toHexString
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.trace
import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.utils.sumCash
import net.corda.finance.utils.sumCashOrNull
@ -31,10 +24,9 @@ import net.corda.finance.utils.sumCashOrZero
import org.bouncycastle.asn1.x500.X500Name
import java.math.BigInteger
import java.security.PublicKey
import java.sql.SQLException
import java.sql.DatabaseMetaData
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import java.util.concurrent.atomic.AtomicReference
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
@ -44,6 +36,61 @@ import kotlin.concurrent.withLock
// Just a fake program identifier for now. In a real system it could be, for instance, the hash of the program bytecode.
val CASH_PROGRAM_ID = Cash()
/**
* Pluggable interface to allow for different cash selection provider implementations
* Default implementation [CashSelectionH2Impl] uses H2 database and a custom function within H2 to perform aggregation.
* Custom implementations must implement this interface and declare their implementation in
* META-INF/services/net.corda.contracts.asset.CashSelection
*/
interface CashSelection {
companion object {
val instance = AtomicReference<CashSelection>()
fun getInstance(metadata: () -> java.sql.DatabaseMetaData): CashSelection {
return instance.get() ?: {
val _metadata = metadata()
val cashSelectionAlgos = ServiceLoader.load(CashSelection::class.java).toList()
val cashSelectionAlgo = cashSelectionAlgos.firstOrNull { it.isCompatible(_metadata) }
cashSelectionAlgo?.let {
instance.set(cashSelectionAlgo)
cashSelectionAlgo
} ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver ($_metadata)." +
"\nPlease specify an implementation in META-INF/services/net.corda.finance.contracts.asset.CashSelection")
}.invoke()
}
}
/**
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
* this method determines whether the loaded implementation is compatible and usable with the currently
* loaded JDBC driver.
* Note: the first loaded implementation to pass this check will be used at run-time.
*/
fun isCompatible(metadata: DatabaseMetaData): Boolean
/**
* Query to gather Cash states that are available
* @param services The service hub to allow access to the database session
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
* @param notary If null the notary source is ignored, if specified then only states marked
* with this notary are included.
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
* @return The matching states that were found. If sufficient funds were found these will be locked,
* otherwise what is available is returned unlocked for informational purposes.
*/
@Suspendable
fun unconsumedCashStatesForSpending(services: ServiceHub,
amount: Amount<Currency>,
onlyFromIssuerParties: Set<AbstractParty> = emptySet(),
notary: Party? = null,
lockId: UUID,
withIssuerRefs: Set<OpaqueBytes> = emptySet()): List<StateAndRef<Cash.State>>
}
/**
* A cash transaction may split and merge money represented by a set of (issuer, depositRef) pairs, across multiple
* input and output states. Imagine a Bitcoin transaction but in which all UTXOs had a colour
@ -217,10 +264,6 @@ class Cash : OnLedgerAsset<Currency, Cash.Commands, Cash.State>() {
}
companion object {
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
private val MAX_RETRIES = 5
private val RETRY_SLEEP = 100
private val spendLock: ReentrantLock = ReentrantLock()
/**
* Generate a transaction that moves an amount of currency to the given pubkey.
*
@ -252,126 +295,12 @@ class Cash : OnLedgerAsset<Currency, Cash.Commands, Cash.State>() {
= txState.copy(data = txState.data.copy(amount = amt, owner = owner))
// Retrieve unspent and unlocked cash states that meet our spending criteria.
val acceptableCoins = Cash.unconsumedCashStatesForSpending(services, amount, onlyFromParties, tx.notary, tx.lockId)
val acceptableCoins = CashSelection.getInstance({services.jdbcSession().metaData}).unconsumedCashStatesForSpending(services, amount, onlyFromParties, tx.notary, tx.lockId)
return OnLedgerAsset.generateSpend(tx, amount, to, acceptableCoins,
{ state, quantity, owner -> deriveState(state, quantity, owner) },
{ Cash().generateMoveCommand() })
}
/**
* An optimised query to gather Cash states that are available and retry if they are temporarily unavailable.
* @param services The service hub to allow access to the database session
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
* @param notary If null the notary source is ignored, if specified then only states marked
* with this notary are included.
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
* @return The matching states that were found. If sufficient funds were found these will be locked,
* otherwise what is available is returned unlocked for informational purposes.
*/
@JvmStatic
@Suspendable
fun unconsumedCashStatesForSpending(services: ServiceHub,
amount: Amount<Currency>,
onlyFromIssuerParties: Set<AbstractParty> = emptySet(),
notary: Party? = null,
lockId: UUID,
withIssuerRefs: Set<OpaqueBytes> = emptySet()): List<StateAndRef<Cash.State>> {
val issuerKeysStr = onlyFromIssuerParties.fold("") { left, right -> left + "('${right.owningKey.toBase58String()}')," }.dropLast(1)
val issuerRefsStr = withIssuerRefs.fold("") { left, right -> left + "('${right.bytes.toHexString()}')," }.dropLast(1)
val stateAndRefs = mutableListOf<StateAndRef<Cash.State>>()
// TODO: Need to provide a database provider independent means of performing this function.
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
// running total of such an accumulator
// 2) H2 uses session variables to perform this accumulator function:
// http://www.h2database.com/html/functions.html#set
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
for (retryCount in 1..MAX_RETRIES) {
spendLock.withLock {
val statement = services.jdbcSession().createStatement()
try {
statement.execute("CALL SET(@t, 0);")
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
// the softLockReserve update will detect whether we try to lock states locked by others
val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
""" +
(if (notary != null)
" AND vs.notary_name = '${notary.name}'" else "") +
(if (onlyFromIssuerParties.isNotEmpty())
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
(if (withIssuerRefs.isNotEmpty())
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
// Retrieve spendable state refs
val rs = statement.executeQuery(selectJoin)
stateAndRefs.clear()
log.debug(selectJoin)
var totalPennies = 0L
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
}
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
// With the current single threaded state machine available states are guaranteed to lock.
// TODO However, we will have to revisit these methods in the future multi-threaded.
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
return stateAndRefs
}
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
// retry as more states may become available
} catch (e: SQLException) {
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
$e.
""")
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
stateAndRefs.clear()
log.warn(e.message)
// retry only if there are locked states that may become available again (or consumed with change)
} finally {
statement.close()
}
}
log.warn("Coin selection failed on attempt $retryCount")
// TODO: revisit the back off strategy for contended spending.
if (retryCount != MAX_RETRIES) {
Strand.sleep(RETRY_SLEEP * retryCount.toLong())
}
}
log.warn("Insufficient spendable states identified for $amount")
return stateAndRefs
}
}
}
// Small DSL extensions.

View File

@ -0,0 +1,151 @@
package net.corda.finance.contracts.asset.cash.selection
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toBase58String
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.*
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.CashSelection
import java.sql.DatabaseMetaData
import java.sql.SQLException
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class CashSelectionH2Impl : CashSelection {
companion object {
val JDBC_DRIVER_NAME = "H2 JDBC Driver"
val log = loggerFor<CashSelectionH2Impl>()
}
override fun isCompatible(metaData: DatabaseMetaData): Boolean {
return metaData.driverName == JDBC_DRIVER_NAME
}
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
private val MAX_RETRIES = 5
private val RETRY_SLEEP = 100
private val spendLock: ReentrantLock = ReentrantLock()
/**
* An optimised query to gather Cash states that are available and retry if they are temporarily unavailable.
* @param services The service hub to allow access to the database session
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
* @param notary If null the notary source is ignored, if specified then only states marked
* with this notary are included.
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
* @return The matching states that were found. If sufficient funds were found these will be locked,
* otherwise what is available is returned unlocked for informational purposes.
*/
@Suspendable
override fun unconsumedCashStatesForSpending(services: ServiceHub,
amount: Amount<Currency>,
onlyFromIssuerParties: Set<AbstractParty>,
notary: Party?,
lockId: UUID,
withIssuerRefs: Set<OpaqueBytes>): List<StateAndRef<Cash.State>> {
val issuerKeysStr = onlyFromIssuerParties.fold("") { left, right -> left + "('${right.owningKey.toBase58String()}')," }.dropLast(1)
val issuerRefsStr = withIssuerRefs.fold("") { left, right -> left + "('${right.bytes.toHexString()}')," }.dropLast(1)
val stateAndRefs = mutableListOf<StateAndRef<Cash.State>>()
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
// running total of such an accumulator
// 2) H2 uses session variables to perform this accumulator function:
// http://www.h2database.com/html/functions.html#set
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
for (retryCount in 1..MAX_RETRIES) {
spendLock.withLock {
val statement = services.jdbcSession().createStatement()
try {
statement.execute("CALL SET(@t, 0);")
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
// the softLockReserve update will detect whether we try to lock states locked by others
val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
""" +
(if (notary != null)
" AND vs.notary_name = '${notary.name}'" else "") +
(if (onlyFromIssuerParties.isNotEmpty())
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
(if (withIssuerRefs.isNotEmpty())
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
// Retrieve spendable state refs
val rs = statement.executeQuery(selectJoin)
stateAndRefs.clear()
log.debug(selectJoin)
var totalPennies = 0L
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
}
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
// With the current single threaded state machine available states are guaranteed to lock.
// TODO However, we will have to revisit these methods in the future multi-threaded.
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
return stateAndRefs
}
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
// retry as more states may become available
} catch (e: SQLException) {
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
$e.
""")
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
stateAndRefs.clear()
log.warn(e.message)
// retry only if there are locked states that may become available again (or consumed with change)
} finally {
statement.close()
}
}
log.warn("Coin selection failed on attempt $retryCount")
// TODO: revisit the back off strategy for contended spending.
if (retryCount != MAX_RETRIES) {
Strand.sleep(RETRY_SLEEP * retryCount.toLong())
}
}
log.warn("Insufficient spendable states identified for $amount")
return stateAndRefs
}
}

View File

@ -0,0 +1,32 @@
package net.corda.finance.contracts.asset.cash.selection
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.CashSelection
import java.sql.DatabaseMetaData
import java.util.*
class CashSelectionMySQLImpl : CashSelection {
companion object {
val JDBC_DRIVER_NAME = "MySQL JDBC Driver"
}
override fun isCompatible(metadata: DatabaseMetaData): Boolean {
return metadata.driverName == JDBC_DRIVER_NAME
}
override fun unconsumedCashStatesForSpending(services: ServiceHub,
amount: Amount<Currency>,
onlyFromIssuerParties: Set<AbstractParty>,
notary: Party?,
lockId: UUID,
withIssuerRefs: Set<OpaqueBytes>): List<StateAndRef<Cash.State>> {
TODO("MySQL cash selection not implemented")
}
}

View File

@ -14,6 +14,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.CashSelection
import net.corda.finance.issuedBy
import java.util.*
@ -25,7 +26,7 @@ import java.util.*
* issuer.
*/
@StartableByRPC
class CashExitFlow(val amount: Amount<Currency>, val issueRef: OpaqueBytes, progressTracker: ProgressTracker) : AbstractCashFlow<AbstractCashFlow.Result>(progressTracker) {
class CashExitFlow(val amount: Amount<Currency>, val issuerRef: OpaqueBytes, progressTracker: ProgressTracker) : AbstractCashFlow<AbstractCashFlow.Result>(progressTracker) {
constructor(amount: Amount<Currency>, issueRef: OpaqueBytes) : this(amount, issueRef, tracker())
constructor(request: ExitRequest) : this(request.amount, request.issueRef, tracker())
@ -42,8 +43,8 @@ class CashExitFlow(val amount: Amount<Currency>, val issueRef: OpaqueBytes, prog
override fun call(): AbstractCashFlow.Result {
progressTracker.currentStep = GENERATING_TX
val builder = TransactionBuilder(notary = null as Party?)
val issuer = serviceHub.myInfo.legalIdentity.ref(issueRef)
val exitStates = Cash.unconsumedCashStatesForSpending(serviceHub, amount, setOf(issuer.party), builder.notary, builder.lockId, setOf(issuer.reference))
val issuer = serviceHub.myInfo.legalIdentity.ref(issuerRef)
val exitStates = CashSelection.getInstance({serviceHub.jdbcSession().metaData}).unconsumedCashStatesForSpending(serviceHub, amount, setOf(issuer.party), builder.notary, builder.lockId, setOf(issuer.reference))
val signers = try {
Cash().generateExit(
builder,

View File

@ -0,0 +1,2 @@
net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
net.corda.finance.contracts.asset.cash.selection.CashSelectionMySQLImpl