mirror of
https://github.com/corda/corda.git
synced 2025-02-22 10:10:59 +00:00
Remove mutex and stop catching deadlock / SQLExceptions. (#3242)
SQL Exceptions thrown with Cash Selection are now retried via Flow Hospital
This commit is contained in:
parent
78c759e2e6
commit
accb9eb5b3
@ -16,11 +16,8 @@ import net.corda.finance.contracts.asset.Cash
|
|||||||
import java.sql.Connection
|
import java.sql.Connection
|
||||||
import java.sql.DatabaseMetaData
|
import java.sql.DatabaseMetaData
|
||||||
import java.sql.ResultSet
|
import java.sql.ResultSet
|
||||||
import java.sql.SQLException
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pluggable interface to allow for different cash selection provider implementations
|
* Pluggable interface to allow for different cash selection provider implementations
|
||||||
@ -54,7 +51,6 @@ abstract class AbstractCashSelection {
|
|||||||
private val MAX_RETRIES = 8
|
private val MAX_RETRIES = 8
|
||||||
private val RETRY_SLEEP = 100
|
private val RETRY_SLEEP = 100
|
||||||
private val RETRY_CAP = 2000
|
private val RETRY_CAP = 2000
|
||||||
private val spendLock: ReentrantLock = ReentrantLock()
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
|
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
|
||||||
@ -126,54 +122,48 @@ abstract class AbstractCashSelection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
||||||
spendLock.withLock {
|
val connection = services.jdbcSession()
|
||||||
val connection = services.jdbcSession()
|
try {
|
||||||
try {
|
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||||
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||||
// the softLockReserve update will detect whether we try to lock states locked by others
|
return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs ->
|
||||||
return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs ->
|
stateAndRefs.clear()
|
||||||
stateAndRefs.clear()
|
|
||||||
|
|
||||||
var totalPennies = 0L
|
var totalPennies = 0L
|
||||||
val stateRefs = mutableSetOf<StateRef>()
|
val stateRefs = mutableSetOf<StateRef>()
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
val txHash = SecureHash.parse(rs.getString(1))
|
val txHash = SecureHash.parse(rs.getString(1))
|
||||||
val index = rs.getInt(2)
|
val index = rs.getInt(2)
|
||||||
val pennies = rs.getLong(3)
|
val pennies = rs.getLong(3)
|
||||||
totalPennies = rs.getLong(4)
|
totalPennies = rs.getLong(4)
|
||||||
val rowLockId = rs.getString(5)
|
val rowLockId = rs.getString(5)
|
||||||
stateRefs.add(StateRef(txHash, index))
|
stateRefs.add(StateRef(txHash, index))
|
||||||
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
||||||
}
|
|
||||||
|
|
||||||
if (stateRefs.isNotEmpty()) {
|
|
||||||
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
|
||||||
stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs)))
|
|
||||||
}
|
|
||||||
|
|
||||||
val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity
|
|
||||||
if (success) {
|
|
||||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
|
||||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
|
||||||
|
|
||||||
// With the current single threaded state machine available states are guaranteed to lock.
|
|
||||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
|
||||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
|
||||||
} else {
|
|
||||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
|
||||||
}
|
|
||||||
success
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// retry as more states may become available
|
if (stateRefs.isNotEmpty()) {
|
||||||
} catch (e: SQLException) {
|
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
||||||
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs)))
|
||||||
$e.
|
}
|
||||||
""")
|
|
||||||
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity
|
||||||
log.warn(e.message)
|
if (success) {
|
||||||
// retry only if there are locked states that may become available again (or consumed with change)
|
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||||
|
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||||
|
|
||||||
|
// With the current single threaded state machine available states are guaranteed to lock.
|
||||||
|
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||||
|
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||||
|
} else {
|
||||||
|
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||||
|
}
|
||||||
|
success
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retry as more states may become available
|
||||||
|
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
||||||
|
log.warn(e.message)
|
||||||
|
// retry only if there are locked states that may become available again (or consumed with change)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user