mirror of
https://github.com/corda/corda.git
synced 2025-05-29 13:44:25 +00:00
Improved Cash Selection interface (#1858)
* Cash selection refactoring such that 3d party DB providers are only required to implement Coin Selection SQL logic. * Re-added debug logging statement. * Updated to include PR review feedback from VK * Refactoring following rebase from master. * Fix broken JUnits following rebase. * Use JDBC ResultSet getBlob() and added custom serializer to address concern raised by tomtau in PR. * Fix failing JUnits.
This commit is contained in:
parent
38cf4a489e
commit
2a68e23e69
@ -6,6 +6,7 @@ import net.corda.core.internal.WriteOnceProperty
|
|||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.ByteSequence
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
import net.corda.core.utilities.sequence
|
import net.corda.core.utilities.sequence
|
||||||
|
import java.sql.Blob
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstraction for serializing and deserializing objects, with support for versioning of the wire format via
|
* An abstraction for serializing and deserializing objects, with support for versioning of the wire format via
|
||||||
@ -185,6 +186,11 @@ inline fun <reified T : Any> SerializedBytes<T>.deserialize(serializationFactory
|
|||||||
*/
|
*/
|
||||||
inline fun <reified T : Any> ByteArray.deserialize(serializationFactory: SerializationFactory = SerializationFactory.defaultFactory, context: SerializationContext = serializationFactory.defaultContext): T = this.sequence().deserialize(serializationFactory, context)
|
inline fun <reified T : Any> ByteArray.deserialize(serializationFactory: SerializationFactory = SerializationFactory.defaultFactory, context: SerializationContext = serializationFactory.defaultContext): T = this.sequence().deserialize(serializationFactory, context)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convenience extension method for deserializing a JDBC Blob, utilising the defaults.
|
||||||
|
*/
|
||||||
|
inline fun <reified T : Any> Blob.deserialize(serializationFactory: SerializationFactory = SerializationFactory.defaultFactory, context: SerializationContext = serializationFactory.defaultContext): T = this.getBytes(1, this.length().toInt()).deserialize(serializationFactory, context)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience extension method for serializing an object of type T, utilising the defaults.
|
* Convenience extension method for serializing an object of type T, utilising the defaults.
|
||||||
*/
|
*/
|
||||||
|
@ -19,79 +19,21 @@ import net.corda.core.schemas.PersistentState
|
|||||||
import net.corda.core.schemas.QueryableState
|
import net.corda.core.schemas.QueryableState
|
||||||
import net.corda.core.transactions.LedgerTransaction
|
import net.corda.core.transactions.LedgerTransaction
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
|
||||||
import net.corda.core.utilities.toBase58String
|
import net.corda.core.utilities.toBase58String
|
||||||
import net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl
|
import net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection
|
||||||
import net.corda.finance.schemas.CashSchemaV1
|
import net.corda.finance.schemas.CashSchemaV1
|
||||||
import net.corda.finance.utils.sumCash
|
import net.corda.finance.utils.sumCash
|
||||||
import net.corda.finance.utils.sumCashOrNull
|
import net.corda.finance.utils.sumCashOrNull
|
||||||
import net.corda.finance.utils.sumCashOrZero
|
import net.corda.finance.utils.sumCashOrZero
|
||||||
import java.math.BigInteger
|
import java.math.BigInteger
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.sql.DatabaseMetaData
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
//
|
//
|
||||||
// Cash
|
// Cash
|
||||||
//
|
//
|
||||||
|
|
||||||
/**
|
|
||||||
* Pluggable interface to allow for different cash selection provider implementations
|
|
||||||
* Default implementation [CashSelectionH2Impl] uses H2 database and a custom function within H2 to perform aggregation.
|
|
||||||
* Custom implementations must implement this interface and declare their implementation in
|
|
||||||
* META-INF/services/net.corda.contracts.asset.CashSelection
|
|
||||||
*/
|
|
||||||
interface CashSelection {
|
|
||||||
companion object {
|
|
||||||
val instance = AtomicReference<CashSelection>()
|
|
||||||
|
|
||||||
fun getInstance(metadata: () -> java.sql.DatabaseMetaData): CashSelection {
|
|
||||||
return instance.get() ?: {
|
|
||||||
val _metadata = metadata()
|
|
||||||
val cashSelectionAlgos = ServiceLoader.load(CashSelection::class.java).toList()
|
|
||||||
val cashSelectionAlgo = cashSelectionAlgos.firstOrNull { it.isCompatible(_metadata) }
|
|
||||||
cashSelectionAlgo?.let {
|
|
||||||
instance.set(cashSelectionAlgo)
|
|
||||||
cashSelectionAlgo
|
|
||||||
} ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver ($_metadata)." +
|
|
||||||
"\nPlease specify an implementation in META-INF/services/net.corda.finance.contracts.asset.CashSelection")
|
|
||||||
}.invoke()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
|
|
||||||
* this method determines whether the loaded implementation is compatible and usable with the currently
|
|
||||||
* loaded JDBC driver.
|
|
||||||
* Note: the first loaded implementation to pass this check will be used at run-time.
|
|
||||||
*/
|
|
||||||
fun isCompatible(metadata: DatabaseMetaData): Boolean
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Query to gather Cash states that are available
|
|
||||||
* @param services The service hub to allow access to the database session
|
|
||||||
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
|
|
||||||
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
|
|
||||||
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
|
|
||||||
* @param notary If null the notary source is ignored, if specified then only states marked
|
|
||||||
* with this notary are included.
|
|
||||||
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
|
|
||||||
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
|
|
||||||
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
|
|
||||||
* @return The matching states that were found. If sufficient funds were found these will be locked,
|
|
||||||
* otherwise what is available is returned unlocked for informational purposes.
|
|
||||||
*/
|
|
||||||
@Suspendable
|
|
||||||
fun unconsumedCashStatesForSpending(services: ServiceHub,
|
|
||||||
amount: Amount<Currency>,
|
|
||||||
onlyFromIssuerParties: Set<AbstractParty> = emptySet(),
|
|
||||||
notary: Party? = null,
|
|
||||||
lockId: UUID,
|
|
||||||
withIssuerRefs: Set<OpaqueBytes> = emptySet()): List<StateAndRef<Cash.State>>
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A cash transaction may split and merge money represented by a set of (issuer, depositRef) pairs, across multiple
|
* A cash transaction may split and merge money represented by a set of (issuer, depositRef) pairs, across multiple
|
||||||
* input and output states. Imagine a Bitcoin transaction but in which all UTXOs had a colour
|
* input and output states. Imagine a Bitcoin transaction but in which all UTXOs had a colour
|
||||||
@ -384,7 +326,7 @@ class Cash : OnLedgerAsset<Currency, Cash.Commands, Cash.State>() {
|
|||||||
|
|
||||||
// Retrieve unspent and unlocked cash states that meet our spending criteria.
|
// Retrieve unspent and unlocked cash states that meet our spending criteria.
|
||||||
val totalAmount = payments.map { it.amount }.sumOrThrow()
|
val totalAmount = payments.map { it.amount }.sumOrThrow()
|
||||||
val cashSelection = CashSelection.getInstance({ services.jdbcSession().metaData })
|
val cashSelection = AbstractCashSelection.getInstance({ services.jdbcSession().metaData })
|
||||||
val acceptableCoins = cashSelection.unconsumedCashStatesForSpending(services, totalAmount, onlyFromParties, tx.notary, tx.lockId)
|
val acceptableCoins = cashSelection.unconsumedCashStatesForSpending(services, totalAmount, onlyFromParties, tx.notary, tx.lockId)
|
||||||
val revocationEnabled = false // Revocation is currently unsupported
|
val revocationEnabled = false // Revocation is currently unsupported
|
||||||
// Generate a new identity that change will be sent to for confidentiality purposes. This means that a
|
// Generate a new identity that change will be sent to for confidentiality purposes. This means that a
|
||||||
|
@ -0,0 +1,168 @@
|
|||||||
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import net.corda.core.contracts.Amount
|
||||||
|
import net.corda.core.contracts.StateAndRef
|
||||||
|
import net.corda.core.contracts.StateRef
|
||||||
|
import net.corda.core.contracts.TransactionState
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.identity.AbstractParty
|
||||||
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.node.ServiceHub
|
||||||
|
import net.corda.core.node.services.StatesNotAvailableException
|
||||||
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
|
import net.corda.core.utilities.*
|
||||||
|
import net.corda.finance.contracts.asset.Cash
|
||||||
|
import java.sql.*
|
||||||
|
import java.util.*
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
|
import kotlin.concurrent.withLock
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pluggable interface to allow for different cash selection provider implementations
|
||||||
|
* Default implementation [CashSelectionH2Impl] uses H2 database and a custom function within H2 to perform aggregation.
|
||||||
|
* Custom implementations must implement this interface and declare their implementation in
|
||||||
|
* META-INF/services/net.corda.contracts.asset.CashSelection
|
||||||
|
*/
|
||||||
|
abstract class AbstractCashSelection {
|
||||||
|
companion object {
|
||||||
|
val instance = AtomicReference<AbstractCashSelection>()
|
||||||
|
|
||||||
|
fun getInstance(metadata: () -> java.sql.DatabaseMetaData): AbstractCashSelection {
|
||||||
|
return instance.get() ?: {
|
||||||
|
val _metadata = metadata()
|
||||||
|
val cashSelectionAlgos = ServiceLoader.load(AbstractCashSelection::class.java).toList()
|
||||||
|
val cashSelectionAlgo = cashSelectionAlgos.firstOrNull { it.isCompatible(_metadata) }
|
||||||
|
cashSelectionAlgo?.let {
|
||||||
|
instance.set(cashSelectionAlgo)
|
||||||
|
cashSelectionAlgo
|
||||||
|
} ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver ($_metadata)." +
|
||||||
|
"\nPlease specify an implementation in META-INF/services/${AbstractCashSelection::class.java}")
|
||||||
|
}.invoke()
|
||||||
|
}
|
||||||
|
|
||||||
|
val log = loggerFor<AbstractCashSelection>()
|
||||||
|
}
|
||||||
|
|
||||||
|
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
|
||||||
|
// TODO: make parameters configurable when we get CorDapp configuration.
|
||||||
|
private val MAX_RETRIES = 8
|
||||||
|
private val RETRY_SLEEP = 100
|
||||||
|
private val RETRY_CAP = 2000
|
||||||
|
private val spendLock: ReentrantLock = ReentrantLock()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
|
||||||
|
* this method determines whether the loaded implementation is compatible and usable with the currently
|
||||||
|
* loaded JDBC driver.
|
||||||
|
* Note: the first loaded implementation to pass this check will be used at run-time.
|
||||||
|
*/
|
||||||
|
abstract fun isCompatible(metadata: DatabaseMetaData): Boolean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A vendor specific query(ies) to gather Cash states that are available.
|
||||||
|
* @param statement The service hub to allow access to the database session
|
||||||
|
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
|
||||||
|
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
|
||||||
|
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
|
||||||
|
* @param notary If null the notary source is ignored, if specified then only states marked
|
||||||
|
* with this notary are included.
|
||||||
|
* @param onlyFromIssuerParties Optional issuer parties to match against.
|
||||||
|
* @param withIssuerRefs Optional issuer references to match against.
|
||||||
|
* @return JDBC ResultSet with the matching states that were found. If sufficient funds were found these will be locked,
|
||||||
|
* otherwise what is available is returned unlocked for informational purposes.
|
||||||
|
*/
|
||||||
|
abstract fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
||||||
|
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet
|
||||||
|
|
||||||
|
override abstract fun toString() : String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query to gather Cash states that are available and retry if they are temporarily unavailable.
|
||||||
|
* @param services The service hub to allow access to the database session
|
||||||
|
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
|
||||||
|
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
|
||||||
|
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
|
||||||
|
* @param notary If null the notary source is ignored, if specified then only states marked
|
||||||
|
* with this notary are included.
|
||||||
|
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
|
||||||
|
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
|
||||||
|
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
|
||||||
|
* @return The matching states that were found. If sufficient funds were found these will be locked,
|
||||||
|
* otherwise what is available is returned unlocked for informational purposes.
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
fun unconsumedCashStatesForSpending(services: ServiceHub,
|
||||||
|
amount: Amount<Currency>,
|
||||||
|
onlyFromIssuerParties: Set<AbstractParty> = emptySet(),
|
||||||
|
notary: Party? = null,
|
||||||
|
lockId: UUID,
|
||||||
|
withIssuerRefs: Set<OpaqueBytes> = emptySet()): List<StateAndRef<Cash.State>> {
|
||||||
|
val stateAndRefs = mutableListOf<StateAndRef<Cash.State>>()
|
||||||
|
|
||||||
|
for (retryCount in 1..MAX_RETRIES) {
|
||||||
|
if (!attemptSpend(services, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs, stateAndRefs)) {
|
||||||
|
log.warn("Coin selection failed on attempt $retryCount")
|
||||||
|
// TODO: revisit the back off strategy for contended spending.
|
||||||
|
if (retryCount != MAX_RETRIES) {
|
||||||
|
stateAndRefs.clear()
|
||||||
|
val durationMillis = (minOf(RETRY_SLEEP.shl(retryCount), RETRY_CAP / 2) * (1.0 + Math.random())).toInt()
|
||||||
|
FlowLogic.sleep(durationMillis.millis)
|
||||||
|
} else {
|
||||||
|
log.warn("Insufficient spendable states identified for $amount")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateAndRefs
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
||||||
|
spendLock.withLock {
|
||||||
|
val connection = services.jdbcSession()
|
||||||
|
try {
|
||||||
|
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||||
|
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||||
|
val rs = executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs)
|
||||||
|
stateAndRefs.clear()
|
||||||
|
|
||||||
|
var totalPennies = 0L
|
||||||
|
while (rs.next()) {
|
||||||
|
val txHash = SecureHash.parse(rs.getString(1))
|
||||||
|
val index = rs.getInt(2)
|
||||||
|
val stateRef = StateRef(txHash, index)
|
||||||
|
val state = rs.getBlob(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
|
||||||
|
val pennies = rs.getLong(4)
|
||||||
|
totalPennies = rs.getLong(5)
|
||||||
|
val rowLockId = rs.getString(6)
|
||||||
|
stateAndRefs.add(StateAndRef(state, stateRef))
|
||||||
|
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
|
||||||
|
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||||
|
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||||
|
|
||||||
|
// With the current single threaded state machine available states are guaranteed to lock.
|
||||||
|
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||||
|
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||||
|
// retry as more states may become available
|
||||||
|
} catch (e: SQLException) {
|
||||||
|
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
||||||
|
$e.
|
||||||
|
""")
|
||||||
|
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
||||||
|
log.warn(e.message)
|
||||||
|
// retry only if there are locked states that may become available again (or consumed with change)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
@ -1,28 +1,15 @@
|
|||||||
package net.corda.finance.contracts.asset.cash.selection
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
import net.corda.core.contracts.StateAndRef
|
|
||||||
import net.corda.core.contracts.StateRef
|
|
||||||
import net.corda.core.contracts.TransactionState
|
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.ServiceHub
|
|
||||||
import net.corda.core.node.services.StatesNotAvailableException
|
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
|
||||||
import net.corda.core.serialization.deserialize
|
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import java.sql.Connection
|
||||||
import net.corda.finance.contracts.asset.CashSelection
|
|
||||||
import java.sql.DatabaseMetaData
|
import java.sql.DatabaseMetaData
|
||||||
import java.sql.SQLException
|
import java.sql.ResultSet
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
|
|
||||||
class CashSelectionH2Impl : CashSelection {
|
class CashSelectionH2Impl : AbstractCashSelection() {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val JDBC_DRIVER_NAME = "H2 JDBC Driver"
|
const val JDBC_DRIVER_NAME = "H2 JDBC Driver"
|
||||||
@ -33,53 +20,8 @@ class CashSelectionH2Impl : CashSelection {
|
|||||||
return metadata.driverName == JDBC_DRIVER_NAME
|
return metadata.driverName == JDBC_DRIVER_NAME
|
||||||
}
|
}
|
||||||
|
|
||||||
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
|
override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME"
|
||||||
private val MAX_RETRIES = 8
|
|
||||||
private val RETRY_SLEEP = 100
|
|
||||||
private val RETRY_CAP = 2000
|
|
||||||
private val spendLock: ReentrantLock = ReentrantLock()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An optimised query to gather Cash states that are available and retry if they are temporarily unavailable.
|
|
||||||
* @param services The service hub to allow access to the database session
|
|
||||||
* @param amount The amount of currency desired (ignoring issues, but specifying the currency)
|
|
||||||
* @param onlyFromIssuerParties If empty the operation ignores the specifics of the issuer,
|
|
||||||
* otherwise the set of eligible states wil be filtered to only include those from these issuers.
|
|
||||||
* @param notary If null the notary source is ignored, if specified then only states marked
|
|
||||||
* with this notary are included.
|
|
||||||
* @param lockId The FlowLogic.runId.uuid of the flow, which is used to soft reserve the states.
|
|
||||||
* Also, previous outputs of the flow will be eligible as they are implicitly locked with this id until the flow completes.
|
|
||||||
* @param withIssuerRefs If not empty the specific set of issuer references to match against.
|
|
||||||
* @return The matching states that were found. If sufficient funds were found these will be locked,
|
|
||||||
* otherwise what is available is returned unlocked for informational purposes.
|
|
||||||
*/
|
|
||||||
@Suspendable
|
|
||||||
override fun unconsumedCashStatesForSpending(services: ServiceHub,
|
|
||||||
amount: Amount<Currency>,
|
|
||||||
onlyFromIssuerParties: Set<AbstractParty>,
|
|
||||||
notary: Party?,
|
|
||||||
lockId: UUID,
|
|
||||||
withIssuerRefs: Set<OpaqueBytes>): List<StateAndRef<Cash.State>> {
|
|
||||||
|
|
||||||
val stateAndRefs = mutableListOf<StateAndRef<Cash.State>>()
|
|
||||||
|
|
||||||
for (retryCount in 1..MAX_RETRIES) {
|
|
||||||
if (!attemptSpend(services, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs, stateAndRefs)) {
|
|
||||||
log.warn("Coin selection failed on attempt $retryCount")
|
|
||||||
// TODO: revisit the back off strategy for contended spending.
|
|
||||||
if (retryCount != MAX_RETRIES) {
|
|
||||||
stateAndRefs.clear()
|
|
||||||
val durationMillis = (minOf(RETRY_SLEEP.shl(retryCount), RETRY_CAP / 2) * (1.0 + Math.random())).toInt()
|
|
||||||
FlowLogic.sleep(durationMillis.millis)
|
|
||||||
} else {
|
|
||||||
log.warn("Insufficient spendable states identified for $amount")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateAndRefs
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
|
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
|
||||||
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
|
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
|
||||||
@ -87,15 +29,11 @@ class CashSelectionH2Impl : CashSelection {
|
|||||||
// 2) H2 uses session variables to perform this accumulator function:
|
// 2) H2 uses session variables to perform this accumulator function:
|
||||||
// http://www.h2database.com/html/functions.html#set
|
// http://www.h2database.com/html/functions.html#set
|
||||||
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
||||||
|
override fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
|
||||||
|
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>) : ResultSet {
|
||||||
|
connection.createStatement().execute("CALL SET(@t, 0);")
|
||||||
|
|
||||||
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
val selectJoin = """
|
||||||
val connection = services.jdbcSession()
|
|
||||||
spendLock.withLock {
|
|
||||||
val statement = connection.createStatement()
|
|
||||||
try {
|
|
||||||
statement.execute("CALL SET(@t, CAST(0 AS BIGINT));")
|
|
||||||
|
|
||||||
val selectJoin = """
|
|
||||||
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
||||||
FROM vault_states AS vs, contract_cash_states AS ccs
|
FROM vault_states AS vs, contract_cash_states AS ccs
|
||||||
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
|
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
|
||||||
@ -103,65 +41,27 @@ class CashSelectionH2Impl : CashSelection {
|
|||||||
AND ccs.ccy_code = ? and @t < ?
|
AND ccs.ccy_code = ? and @t < ?
|
||||||
AND (vs.lock_id = ? OR vs.lock_id is null)
|
AND (vs.lock_id = ? OR vs.lock_id is null)
|
||||||
""" +
|
""" +
|
||||||
(if (notary != null)
|
(if (notary != null)
|
||||||
" AND vs.notary_name = ?" else "") +
|
" AND vs.notary_name = ?" else "") +
|
||||||
(if (onlyFromIssuerParties.isNotEmpty())
|
(if (onlyFromIssuerParties.isNotEmpty())
|
||||||
" AND ccs.issuer_key IN (?)" else "") +
|
" AND ccs.issuer_key IN (?)" else "") +
|
||||||
(if (withIssuerRefs.isNotEmpty())
|
(if (withIssuerRefs.isNotEmpty())
|
||||||
" AND ccs.issuer_ref IN (?)" else "")
|
" AND ccs.issuer_ref IN (?)" else "")
|
||||||
|
|
||||||
// Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection)
|
// Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection)
|
||||||
val psSelectJoin = connection.prepareStatement(selectJoin)
|
val psSelectJoin = connection.prepareStatement(selectJoin)
|
||||||
var pIndex = 0
|
var pIndex = 0
|
||||||
psSelectJoin.setString(++pIndex, amount.token.currencyCode)
|
psSelectJoin.setString(++pIndex, amount.token.currencyCode)
|
||||||
psSelectJoin.setLong(++pIndex, amount.quantity)
|
psSelectJoin.setLong(++pIndex, amount.quantity)
|
||||||
psSelectJoin.setString(++pIndex, lockId.toString())
|
psSelectJoin.setString(++pIndex, lockId.toString())
|
||||||
if (notary != null)
|
if (notary != null)
|
||||||
psSelectJoin.setString(++pIndex, notary.name.toString())
|
psSelectJoin.setString(++pIndex, notary.name.toString())
|
||||||
if (onlyFromIssuerParties.isNotEmpty())
|
if (onlyFromIssuerParties.isNotEmpty())
|
||||||
psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toBase58String() as Any}.toTypedArray() )
|
psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toBase58String() as Any}.toTypedArray() )
|
||||||
if (withIssuerRefs.isNotEmpty())
|
if (withIssuerRefs.isNotEmpty())
|
||||||
psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes.toHexString() as Any }.toTypedArray())
|
psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes.toHexString() as Any }.toTypedArray())
|
||||||
log.debug { psSelectJoin.toString() }
|
log.debug { psSelectJoin.toString() }
|
||||||
|
|
||||||
// Retrieve spendable state refs
|
return psSelectJoin.executeQuery()
|
||||||
val rs = psSelectJoin.executeQuery()
|
|
||||||
stateAndRefs.clear()
|
|
||||||
var totalPennies = 0L
|
|
||||||
while (rs.next()) {
|
|
||||||
val txHash = SecureHash.parse(rs.getString(1))
|
|
||||||
val index = rs.getInt(2)
|
|
||||||
val stateRef = StateRef(txHash, index)
|
|
||||||
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
|
|
||||||
val pennies = rs.getLong(4)
|
|
||||||
totalPennies = rs.getLong(5)
|
|
||||||
val rowLockId = rs.getString(6)
|
|
||||||
stateAndRefs.add(StateAndRef(state, stateRef))
|
|
||||||
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
|
|
||||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
|
||||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
|
||||||
|
|
||||||
// With the current single threaded state machine available states are guaranteed to lock.
|
|
||||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
|
||||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
|
||||||
// retry as more states may become available
|
|
||||||
} catch (e: SQLException) {
|
|
||||||
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
|
||||||
$e.
|
|
||||||
""")
|
|
||||||
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
|
||||||
log.warn(e.message)
|
|
||||||
// retry only if there are locked states that may become available again (or consumed with change)
|
|
||||||
} finally {
|
|
||||||
statement.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,17 +1,15 @@
|
|||||||
package net.corda.finance.contracts.asset.cash.selection
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
import net.corda.core.contracts.StateAndRef
|
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.ServiceHub
|
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import java.sql.Connection
|
||||||
import net.corda.finance.contracts.asset.CashSelection
|
|
||||||
import java.sql.DatabaseMetaData
|
import java.sql.DatabaseMetaData
|
||||||
|
import java.sql.ResultSet
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
class CashSelectionMySQLImpl : CashSelection {
|
class CashSelectionMySQLImpl : AbstractCashSelection() {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val JDBC_DRIVER_NAME = "MySQL JDBC Driver"
|
const val JDBC_DRIVER_NAME = "MySQL JDBC Driver"
|
||||||
@ -21,12 +19,9 @@ class CashSelectionMySQLImpl : CashSelection {
|
|||||||
return metadata.driverName == JDBC_DRIVER_NAME
|
return metadata.driverName == JDBC_DRIVER_NAME
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun unconsumedCashStatesForSpending(services: ServiceHub,
|
override fun executeQuery(statement: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, issuerKeysStr: Set<AbstractParty>, issuerRefsStr: Set<OpaqueBytes>): ResultSet {
|
||||||
amount: Amount<Currency>,
|
|
||||||
onlyFromIssuerParties: Set<AbstractParty>,
|
|
||||||
notary: Party?,
|
|
||||||
lockId: UUID,
|
|
||||||
withIssuerRefs: Set<OpaqueBytes>): List<StateAndRef<Cash.State>> {
|
|
||||||
TODO("MySQL cash selection not implemented")
|
TODO("MySQL cash selection not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString() = "${this::class.java} for ${CashSelectionH2Impl.JDBC_DRIVER_NAME}"
|
||||||
}
|
}
|
@ -14,7 +14,7 @@ import net.corda.core.transactions.TransactionBuilder
|
|||||||
import net.corda.core.utilities.OpaqueBytes
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
import net.corda.core.utilities.ProgressTracker
|
import net.corda.core.utilities.ProgressTracker
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import net.corda.finance.contracts.asset.Cash
|
||||||
import net.corda.finance.contracts.asset.CashSelection
|
import net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection
|
||||||
import net.corda.finance.issuedBy
|
import net.corda.finance.issuedBy
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
@ -46,7 +46,7 @@ class CashExitFlow(private val amount: Amount<Currency>,
|
|||||||
progressTracker.currentStep = GENERATING_TX
|
progressTracker.currentStep = GENERATING_TX
|
||||||
val builder = TransactionBuilder(notary = null)
|
val builder = TransactionBuilder(notary = null)
|
||||||
val issuer = ourIdentity.ref(issuerRef)
|
val issuer = ourIdentity.ref(issuerRef)
|
||||||
val exitStates = CashSelection
|
val exitStates = AbstractCashSelection
|
||||||
.getInstance { serviceHub.jdbcSession().metaData }
|
.getInstance { serviceHub.jdbcSession().metaData }
|
||||||
.unconsumedCashStatesForSpending(serviceHub, amount, setOf(issuer.party), builder.notary, builder.lockId, setOf(issuer.reference))
|
.unconsumedCashStatesForSpending(serviceHub, amount, setOf(issuer.party), builder.notary, builder.lockId, setOf(issuer.reference))
|
||||||
val signers = try {
|
val signers = try {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user