Soft locking implementation using database coin selection

Fix broken IssuerFlowTest

Fix IssuerFlowTests after rebase.

Resolve conflicts after rebase.

Soft locking converted to use persistent store.
Added additional optional 'includeLockStates' parameter in VaultService states API call.
Added Vault softLocked states query API call.

Fixed commercial paper failing test.
Improved exception handling on soft locking UPDATE statement.

Using SELECT FOR UPDATE to ensure correct soft locking data visibility.
Db query operations moved out of mutex code (as locking managed by underlying DB)

Adjusted logging severity levels.

Adjusted logging severity levels.

GenerateSpending now performing fine grained query for unconsumed states by joining with contract_cash_states table.
Using H2 proprietary cummulative counting feature (using sessioni SET variables)
Refactored and simplified HibernateObserver constructor to enable usage in JUnit tests.

Event generator issues larger random amounts (10,000..1,000,000) to those than are spent (0..10,000)
Adjusted Issue (5:1) and Exit (10:1) generation frequency vs spending.

Minor fixes: added optional lockid into select for spending criteria, set notary, additional trace logging.

Generate Cash Schema by default upon node start-up (as part of NodeSchemaService initialisation).

Explicitly close JDBC statements in finally() blocks.

Tightened HibernateObserver constructor.

Fix CommercialPaper test (was missing auto-generation of CONTRACT_CASH table)

Revert default JVM size back to 200Mb.

Revert default number of iterations in Explorer Node Simulation mode (back to 10000 with .5 sec sleep interval).

Remove redundant setter function.

Added TODO messages indicating Requery / H2 restrictions & caveats.

Consumed states lock updates now performed in general consumed state Update.

Updated/added Soft Locking documentation.

Addressed initial PR comments: use THREAD_LOCAL_KRYO, use AbstractParty, extract helper method, improve readability, address some doc typos

Addressed PR comment: removed lockId from WireTransaction.

Fixed soft locking UPDATE statements.

Improvements to VaultSoftLockManager for auto-registration of soft locks for flows with spendable states (as notifications from vault).
Other optimisations (IssuerFlow no longer explicitly reserve/release issued state) and improvements (soft lock release management of soft locks, docs update)

Performance update: now using Requery for UPDATE in release soft locking (non-composite key statement)

Removed redundant TODO messages (TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO)

Minor fixes following rebase

Fixed failing JUnit following rebase

Addressed MH PR review items (1st pass)

Fix broken JUnit

Significant changes to RDBMS operations within coin selection and soft locking as requested by PR review.
(Removed SELECT FOR UPDATE; added RETRY upon coin selection; reverting partial soft locks)

Addressed a number of PR review requests added by MH (comments/spelling, lockID instantiation, HibernateObserver instantiation, cash schema white-listing usage)

Addressed latest PR review comments from RP.

Minor fixes following rebase from master.

Fixed final failing JUnit (issuer flow concurrent).

Updated TraderDemo to trigger concurrent issuance of cash.

Fixed compiler warning on lockId null check.

Fixed subtle bug in coin selection intermittently surfaced in IntegrationTestTutorial.

Fixed small memory leak.

Removed stray } in logger trace message.

Slight rewording of description of Soft Locking in docs.

Renamed NoStatesAvailableException to StatesNotAvailableException.
generateSpend is now Suspendable (calls sleep method on flow upon coin selection retry).

Added companion function to enable a Strand to sleep but without locking transactional context.

Improved logging, changed to StateNotAvailableException, using Flow sleep upon retry, tweaked SELECT criteria in coin selection, fixed bug when insufficient states selectable, generateSpend is now @suspendable

Improved handling and logging of flow results in Simulation Mode.

Fixed minor error in sleep when not an active flow.

Retry coin selection when unavailable states (as these may become available as new states).
Additional debug logging to highlight and identify H2 coin selection sporadic bug.

Inlined sleep method due to intermittent Quasar error.

Re-introduce selection clause that prevents selection and temporary locking of already locked states (by other flows).
Improved trace logging for coin selection (SQL row level info).
Correctly calling FlowStateMachineImpl sleep (now inlined and working correctly)

Fixed rebase error.

Remove redundant TODO message.
This commit is contained in:
josecoll
2017-03-27 17:12:33 +01:00
committed by GitHub
parent 045efbf074
commit 0280299104
24 changed files with 1391 additions and 332 deletions

View File

@ -45,6 +45,7 @@ import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.*
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
@ -278,9 +279,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun makeVaultObservers() {
VaultSoftLockManager(vault, smm)
CashBalanceAsMetricsObserver(services, database)
ScheduledActivityObserver(services)
HibernateObserver(services)
HibernateObserver(vault, schemas)
}
private fun makeInfo(): NodeInfo {

View File

@ -12,7 +12,7 @@ interface SchemaService {
/**
* Represents any options configured on the node for a schema.
*/
data class SchemaOptions(val databaseSchema: String?, val tablePrefix: String?)
data class SchemaOptions(val databaseSchema: String? = null, val tablePrefix: String? = null)
/**
* Options configured for this node's schemas. A missing entry for a schema implies all properties are null.

View File

@ -4,12 +4,13 @@ import kotlinx.support.jdk7.use
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.node.services.VaultService
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.schemas.QueryableState
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.SchemaService
import org.hibernate.SessionFactory
import org.hibernate.boot.model.naming.Identifier
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
@ -25,17 +26,19 @@ import java.util.concurrent.ConcurrentHashMap
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
*/
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver(services: ServiceHubInternal) {
class HibernateObserver(vaultService: VaultService, val schemaService: SchemaService) {
companion object {
val logger = loggerFor<HibernateObserver>()
}
val schemaService = services.schemaService
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
init {
services.vaultService.rawUpdates.subscribe { persist(it.produced) }
schemaService.schemaOptions.map { it.key }.forEach {
makeSessionFactoryForSchema(it)
}
vaultService.rawUpdates.subscribe { persist(it.produced) }
}
private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {

View File

@ -5,6 +5,7 @@ import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.api.SchemaService
import net.corda.schemas.CashSchemaV1
/**
* Most basic implementation of [SchemaService].
@ -12,10 +13,15 @@ import net.corda.node.services.api.SchemaService
* TODO: support loading schema options from node configuration.
* TODO: support configuring what schemas are to be selected for persistence.
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
* TODO: create whitelisted tables when a CorDapp is first installed
*/
class NodeSchemaService : SchemaService, SingletonSerializeAsToken() {
// Currently does not support configuring schema options.
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = emptyMap()
// Whitelisted tables are those required by internal Corda services
// For example, cash is used by the vault for coin selection
// This whitelist will grow as we add further functionality (eg. other fungible assets)
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = mapOf(Pair(CashSchemaV1, SchemaService.SchemaOptions()))
// Currently returns all schemas supported by the state, with no filtering or enrichment.
override fun selectSchemas(state: QueryableState): Iterable<MappedSchema> {

View File

@ -27,6 +27,7 @@ import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.sql.Connection
import java.sql.SQLException
import java.util.*
import java.util.concurrent.TimeUnit
@ -46,6 +47,24 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* Return the current [FlowStateMachineImpl] or null if executing outside of one.
*/
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
/**
* Provide a mechanism to sleep within a Strand without locking any transactional state
*/
// TODO: inlined due to an intermittent Quasar error (to be fully investigated)
@Suppress("NOTHING_TO_INLINE")
@Suspendable
inline fun sleep(millis: Long) {
if (currentStateMachine() != null) {
val db = StrandLocalTransactionManager.database
TransactionManager.current().commit()
TransactionManager.current().close()
Strand.sleep(millis)
StrandLocalTransactionManager.database = db
TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
}
else Strand.sleep(millis)
}
}
// These fields shouldn't be serialised, so they are marked @Transient.
@ -92,7 +111,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive).
val propagated = e.stackTrace[0].className == javaClass.name
processException(e, propagated)
logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e)
logger.error(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e)
return
} catch (t: Throwable) {
recordDuration(startTime, success = false)

View File

@ -1,16 +1,22 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import io.requery.TransactionIsolation
import io.requery.kotlin.`in`
import io.requery.kotlin.eq
import io.requery.kotlin.isNull
import io.requery.kotlin.notNull
import net.corda.contracts.asset.Cash
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.*
import net.corda.core.crypto.AbstractParty
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.unconsumedStates
@ -24,13 +30,20 @@ import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.database.RequeryConfiguration
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.vault.schemas.*
import net.corda.node.utilities.StrandLocalTransactionManager
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.sql.Connection
import java.sql.SQLException
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* Currently, the node vault service is a very simple RDBMS backed implementation. It will change significantly when
@ -60,74 +73,85 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
// For use during publishing only.
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
})
fun recordUpdate(update: Vault.Update): Vault.Update {
if (update != Vault.NoUpdate) {
val producedStateRefs = update.produced.map { it.ref }
val producedStateRefsMap = update.produced.associateBy { it.ref }
val consumedStateRefs = update.consumed.map { it.ref }
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
private fun recordUpdate(update: Vault.Update): Vault.Update {
if (update != Vault.NoUpdate) {
val producedStateRefs = update.produced.map { it.ref }
val producedStateRefsMap = update.produced.associateBy { it.ref }
val consumedStateRefs = update.consumed.map { it.ref }
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
producedStateRefsMap.forEach { it ->
val state = VaultStatesEntity().apply {
txId = it.key.txhash.toString()
index = it.key.index
stateStatus = Vault.StateStatus.UNCONSUMED
contractStateClassName = it.value.state.data.javaClass.name
contractState = it.value.state.serialize(storageKryo()).bytes
notaryName = it.value.state.notary.name
notaryKey = it.value.state.notary.owningKey.toBase58String()
recordedTime = services.clock.instant()
}
insert(state)
}
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
consumedStateRefs.forEach { stateRef ->
val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(),
VaultStatesEntity.INDEX to stateRef.index))
val state = findByKey(VaultStatesEntity::class, queryKey)
state?.run {
stateStatus = Vault.StateStatus.CONSUMED
consumedTime = services.clock.instant()
// remove lock (if held)
if (lockId != null) {
lockId = null
lockUpdateTime = services.clock.instant()
log.trace("Releasing soft lock on consumed state: $stateRef")
}
update(state)
}
}
}
}
return update
}
// TODO: consider moving this logic outside the vault
// TODO: revisit the concurrency safety of this logic when we move beyond single threaded SMM.
// For example, we update currency totals in a non-deterministic order and so expose ourselves to deadlock.
private fun maybeUpdateCashBalances(update: Vault.Update) {
if (update.containsType<Cash.State>()) {
val consumed = sumCashStates(update.consumed)
val produced = sumCashStates(update.produced)
(produced.keys + consumed.keys).map { currency ->
val producedAmount = produced[currency] ?: Amount(0, currency)
val consumedAmount = consumed[currency] ?: Amount(0, currency)
val cashBalanceEntity = VaultCashBalancesEntity()
cashBalanceEntity.currency = currency.currencyCode
cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
producedStateRefsMap.forEach { it ->
val state = VaultStatesEntity().apply {
txId = it.key.txhash.toString()
index = it.key.index
stateStatus = Vault.StateStatus.UNCONSUMED
contractStateClassName = it.value.state.data.javaClass.name
contractState = it.value.state.serialize(storageKryo()).bytes
notaryName = it.value.state.notary.name
notaryKey = it.value.state.notary.owningKey.toBase58String()
recordedTime = services.clock.instant()
}
insert(state)
}
consumedStateRefs.forEach { stateRef ->
val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(),
VaultStatesEntity.INDEX to stateRef.index))
val state = findByKey(VaultStatesEntity::class, queryKey)
state?.run {
stateStatus = Vault.StateStatus.CONSUMED
consumedTime = services.clock.instant()
update(state)
}
}
}
}
return update
}
// TODO: consider moving this logic outside the vault
fun maybeUpdateCashBalances(update: Vault.Update) {
if (update.containsType<Cash.State>()) {
val consumed = sumCashStates(update.consumed)
val produced = sumCashStates(update.produced)
(produced.keys + consumed.keys).map { currency ->
val producedAmount = produced[currency] ?: Amount(0, currency)
val consumedAmount = consumed[currency] ?: Amount(0, currency)
val cashBalanceEntity = VaultCashBalancesEntity()
cashBalanceEntity.currency = currency.currencyCode
cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode)
state?.run {
amount += producedAmount.quantity - consumedAmount.quantity
}
upsert(state ?: cashBalanceEntity)
val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode)
state?.run {
amount += producedAmount.quantity - consumedAmount.quantity
}
upsert(state ?: cashBalanceEntity)
val total = state?.amount ?: cashBalanceEntity.amount
log.trace{"Updating Cash balance for $currency by ${cashBalanceEntity.amount} pennies (total: $total)"}
}
}
}
}
@Suppress("UNCHECKED_CAST")
private fun sumCashStates(states: Iterable<StateAndRef<ContractState>>): Map<Currency, Amount<Currency>> {
return states.mapNotNull { (it.state.data as? FungibleAsset<Currency>)?.amount }
.groupBy { it.token.product }
.mapValues { it.value.map { Amount(it.quantity, it.token.product) }.sumOrThrow() }
}
})
@Suppress("UNCHECKED_CAST")
private fun sumCashStates(states: Iterable<StateAndRef<ContractState>>): Map<Currency, Amount<Currency>> {
return states.mapNotNull { (it.state.data as? FungibleAsset<Currency>)?.amount }
.groupBy { it.token.product }
.mapValues { it.value.map { Amount(it.quantity, it.token.product) }.sumOrThrow() }
}
override val cashBalances: Map<Currency, Amount<Currency>> get() {
val cashBalancesByCurrency =
@ -151,15 +175,17 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
}
}
override fun <T: ContractState> states(clazzes: Set<Class<T>>, statuses: EnumSet<Vault.StateStatus>): Iterable<StateAndRef<T>> {
override fun <T: ContractState> states(clazzes: Set<Class<T>>, statuses: EnumSet<Vault.StateStatus>, includeSoftLockedStates: Boolean): Iterable<StateAndRef<T>> {
val stateAndRefs =
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
var result = select(VaultSchema.VaultStates::class)
var query = select(VaultSchema.VaultStates::class)
.where(VaultSchema.VaultStates::stateStatus `in` statuses)
// TODO: temporary fix to continue supporting track() function (until becomes Typed)
if (!clazzes.map {it.name}.contains(ContractState::class.java.name))
result.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name }))
val iterator = result.get().iterator()
query.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name }))
if (!includeSoftLockedStates)
query.and(VaultSchema.VaultStates::lockId.isNull())
val iterator = query.get().iterator()
Sequence{iterator}
.map { it ->
val stateRef = StateRef(SecureHash.parse(it.txId), it.index)
@ -195,10 +221,13 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
val ourKeys = services.keyManagementService.keys.keys
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) }
if (netDelta != Vault.NoUpdate) {
recordUpdate(netDelta)
maybeUpdateCashBalances(netDelta)
mutex.locked {
recordUpdate(netDelta)
maybeUpdateCashBalances(netDelta)
updatesPublisher.onNext(netDelta)
// flowId required by SoftLockManager to perform auto-registration of soft locks for new states
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid
val vaultUpdate = if (uuid != null) netDelta.copy(flowId = uuid) else netDelta
updatesPublisher.onNext(vaultUpdate)
}
}
}
@ -218,6 +247,208 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
}
}
@Throws(StatesNotAvailableException::class)
override fun softLockReserve(id: UUID, stateRefs: Set<StateRef>) {
if (stateRefs.isNotEmpty()) {
val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList())
val softLockTimestamp = services.clock.instant()
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
val updateStatement = """
UPDATE VAULT_STATES SET lock_id = '$id', lock_timestamp = '$softLockTimestamp'
WHERE ((transaction_id, output_index) IN ($stateRefsAsStr))
AND (state_status = 0)
AND ((lock_id = '$id') OR (lock_id is null));
"""
val statement = configuration.jdbcSession().createStatement()
log.debug(updateStatement)
try {
val rs = statement.executeUpdate(updateStatement)
if (rs > 0 && rs == stateRefs.size) {
log.trace("Reserving soft lock states for $id: $stateRefs")
}
else {
// revert partial soft locks
val revertUpdateStatement = """
UPDATE VAULT_STATES SET lock_id = null
WHERE ((transaction_id, output_index) IN ($stateRefsAsStr))
AND (lock_timestamp = '$softLockTimestamp') AND (lock_id = '$id');
"""
log.debug(revertUpdateStatement)
val rsr = statement.executeUpdate(revertUpdateStatement)
if (rsr > 0) {
log.trace("Reverting $rsr partially soft locked states for $id")
}
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $id but only $rs rows available")
}
}
catch (e: SQLException) {
log.error("""soft lock update error attempting to reserve states: $stateRefs for $id
$e.
""")
throw StatesNotAvailableException("Failed to reserve $stateRefs for $id", e)
}
finally { statement.close() }
}
}
override fun softLockRelease(id: UUID, stateRefs: Set<StateRef>?) {
if (stateRefs == null) {
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
val update = update(VaultStatesEntity::class)
.set(VaultStatesEntity.LOCK_ID, null)
.set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant())
.where (VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
.and (VaultStatesEntity.LOCK_ID eq id.toString()).get()
if (update.value() > 0) {
log.trace("Releasing ${update.value()} soft locked states for $id")
}
}
}
else if (stateRefs.isNotEmpty()) {
val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList())
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
val updateStatement = """
UPDATE VAULT_STATES SET lock_id = null, lock_timestamp = '${services.clock.instant()}'
WHERE (transaction_id, output_index) IN ($stateRefsAsStr)
AND (state_status = 0) AND (lock_id = '$id');
"""
val statement = configuration.jdbcSession().createStatement()
log.debug(updateStatement)
try {
val rs = statement.executeUpdate(updateStatement)
if (rs > 0) {
log.trace("Releasing $rs soft locked states for $id and stateRefs $stateRefs")
}
} catch (e: SQLException) {
log.error("""soft lock update error attempting to release states for $id and $stateRefs")
$e.
""")
} finally {
statement.close()
}
}
}
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
val MAX_RETRIES = 5
val RETRY_SLEEP = 100
val spendLock: ReentrantLock = ReentrantLock()
@Suspendable
internal fun <T : ContractState> unconsumedStatesForSpending(amount: Amount<Currency>, onlyFromIssuerParties: Set<AbstractParty>? = null, notary: Party? = null, lockId: UUID): List<StateAndRef<T>> {
val issuerKeysStr = onlyFromIssuerParties?.fold("") { left, right -> left + "('${right.owningKey.toBase58String()}')," }?.dropLast(1)
var stateAndRefs = mutableListOf<StateAndRef<T>>()
// TODO: Need to provide a database provider independent means of performing this function.
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
// 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the
// running total of such an accumulator
// 2) H2 uses session variables to perform this accumulator function:
// http://www.h2database.com/html/functions.html#set
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
for (retryCount in 1..MAX_RETRIES) {
spendLock.withLock {
val statement = configuration.jdbcSession().createStatement()
try {
statement.execute("CALL SET(@t, 0);")
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
// the softLockReserve update will detect whether we try to lock states locked by others
val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
""" +
(if (notary != null)
" AND vs.notary_key = '${notary.owningKey.toBase58String()}'" else "") +
(if (issuerKeysStr != null)
" AND ccs.issuer_key IN $issuerKeysStr" else "")
// Retrieve spendable state refs
val rs = statement.executeQuery(selectJoin)
stateAndRefs.clear()
log.debug(selectJoin)
var totalPennies = 0L
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBytes(3).deserialize<TransactionState<T>>(storageKryo())
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
}
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
// update database
softLockReserve(lockId, stateAndRefs.map { it.ref }.toSet())
return stateAndRefs
}
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
// retry as more states may become available
} catch (e: SQLException) {
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
$e.
""")
} catch (e: StatesNotAvailableException) {
stateAndRefs.clear()
log.warn(e.message)
// retry only if there are locked states that may become available again (or consumed with change)
} finally {
statement.close()
}
}
log.warn("Coin selection failed on attempt $retryCount")
// TODO: revisit the back off strategy for contended spending.
if (retryCount != MAX_RETRIES) {
FlowStateMachineImpl.sleep(RETRY_SLEEP * retryCount.toLong())
}
}
log.warn("Insufficient spendable states identified for $amount")
return stateAndRefs
}
override fun <T : ContractState> softLockedStates(lockId: UUID?): List<StateAndRef<T>> {
val stateAndRefs =
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
var query = select(VaultSchema.VaultStates::class)
.where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
.and(VaultSchema.VaultStates::contractStateClassName eq Cash.State::class.java.name)
if (lockId != null)
query.and(VaultSchema.VaultStates::lockId eq lockId)
else
query.and(VaultSchema.VaultStates::lockId.notNull())
query.get()
.map { it ->
val stateRef = StateRef(SecureHash.parse(it.txId), it.index)
val state = it.contractState.deserialize<TransactionState<T>>(storageKryo())
StateAndRef(state, stateRef)
}.toList()
}
return stateAndRefs
}
/**
* Generate a transaction that moves an amount of currency to the given pubkey.
*
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
* of given parties. This can be useful if the party you're trying to pay has expectations
* about which type of asset claims they are willing to accept.
*/
@Suspendable
override fun generateSpend(tx: TransactionBuilder,
amount: Amount<Currency>,
to: CompositeKey,
@ -242,30 +473,25 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
//
// Finally, we add the states to the provided partial transaction.
val assetsStates = unconsumedStates<Cash.State>()
// Retrieve unspent and unlocked cash states that meet our spending criteria.
val acceptableCoins = unconsumedStatesForSpending<Cash.State>(amount, onlyFromParties, tx.notary, tx.lockId)
val currency = amount.token
var acceptableCoins = run {
val ofCurrency = assetsStates.filter { it.state.data.amount.token.product == currency }
if (onlyFromParties != null)
ofCurrency.filter { it.state.data.amount.token.issuer.party in onlyFromParties }
else
ofCurrency
}
tx.notary = acceptableCoins.firstOrNull()?.state?.notary
// TODO: We should be prepared to produce multiple transactions spending inputs from
// different notaries, or at least group states by notary and take the set with the
// highest total value
acceptableCoins = acceptableCoins.filter { it.state.notary == tx.notary }
// highest total value.
// notary may be associated with locked state only
tx.notary = acceptableCoins.firstOrNull()?.state?.notary
val (gathered, gatheredAmount) = gatherCoins(acceptableCoins, amount)
val takeChangeFrom = gathered.firstOrNull()
val change = if (takeChangeFrom != null && gatheredAmount > amount) {
Amount(gatheredAmount.quantity - amount.quantity, takeChangeFrom.state.data.amount.token)
} else {
null
}
val keysUsed = gathered.map { it.state.data.owner }.toSet()
val keysUsed = gathered.map { it.state.data.owner }
val states = gathered.groupBy { it.state.data.amount.token.issuer }.map {
val coins = it.value
@ -293,15 +519,14 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
for (state in outputs) tx.addOutputState(state)
// What if we already have a move command with the right keys? Filter it out here or in platform code?
val keysList = keysUsed.toList()
tx.addCommand(Cash().generateMoveCommand(), keysList)
tx.addCommand(Cash().generateMoveCommand(), keysUsed)
// update Vault
// notify(tx.toWireTransaction())
// Vault update must be completed AFTER transaction is recorded to ledger storage!!!
// (this is accomplished within the recordTransaction function)
return Pair(tx, keysList)
return Pair(tx, keysUsed)
}
private fun deriveState(txState: TransactionState<Cash.State>, amount: Amount<Issued<Currency>>, owner: CompositeKey)
@ -325,8 +550,12 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
gatheredAmount += Amount(c.state.data.amount.quantity, amount.token)
}
if (gatheredAmount < amount)
if (gatheredAmount < amount) {
log.trace("Insufficient balance: requested $amount, available $gatheredAmount (total balance ${cashBalances[amount.token]})")
throw InsufficientBalanceException(amount - gatheredAmount)
}
log.trace("Gathered coins: requested $amount, available $gatheredAmount, change: ${gatheredAmount - amount}")
return Pair(gathered, gatheredAmount)
}
@ -339,20 +568,28 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
// Retrieve all unconsumed states for this transaction's inputs
val consumedStates = HashSet<StateAndRef<ContractState>>()
if (tx.inputs.isNotEmpty()) {
val stateRefs = tx.inputs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1)
val stateRefs = stateRefsToCompositeKeyStr(tx.inputs)
// TODO: using native JDBC until requery supports SELECT WHERE COMPOSITE_KEY IN
// https://github.com/requery/requery/issues/434
val statement = configuration.jdbcSession().createStatement()
val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " +
"FROM vault_states " +
"WHERE ((transaction_id, output_index) IN ($stateRefs)) " +
"AND (state_status = 0)")
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val state = rs.getBytes(3).deserialize<TransactionState<ContractState>>(storageKryo())
consumedStates.add(StateAndRef(state, StateRef(txHash, index)))
try {
// TODO: upgrade to Requery 1.2.0 and rewrite with Requery DSL (https://github.com/requery/requery/issues/434)
val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " +
"FROM vault_states " +
"WHERE ((transaction_id, output_index) IN ($stateRefs)) " +
"AND (state_status = 0)")
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val state = rs.getBytes(3).deserialize<TransactionState<ContractState>>(storageKryo())
consumedStates.add(StateAndRef(state, StateRef(txHash, index)))
}
} catch (e: SQLException) {
log.error("""Failed retrieving state refs for: $stateRefs
$e.
""")
}
finally { statement.close() }
}
// Is transaction irrelevant?
@ -387,4 +624,11 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
is LinearState -> state.isRelevant(ourKeys)
else -> false
}
/**
* Helper method to generate a string formatted list of Composite Keys for SQL IN clause
*/
private fun stateRefsToCompositeKeyStr(stateRefs: List<StateRef>): String {
return stateRefs.fold("") { stateRefsAsStr, it -> stateRefsAsStr + "('${it.txhash}','${it.index}')," }.dropLast(1)
}
}

View File

@ -0,0 +1,61 @@
package net.corda.node.services.vault
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.services.VaultService
import net.corda.core.utilities.loggerFor
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import java.util.*
class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
private companion object {
val log = loggerFor<VaultSoftLockManager>()
}
private val trackingFlowIds: MutableSet<UUID> = Collections.synchronizedSet(HashSet())
init {
smm.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && trackingFlowIds.contains(change.id.uuid)) {
log.trace( "${change.addOrRemove} Flow name ${change.logic.javaClass} with id ${change.id}")
unregisterSoftLocks(change.id, change.logic)
}
trackingFlowIds.remove(change.id.uuid)
}
// Discussion
//
// The intent of the following approach is to support what might be a common pattern in a flow:
// 1. Create state
// 2. Do something with state
// without possibility of another flow intercepting the state between 1 and 2,
// since we cannot lock the state before it exists. e.g. Issue and then Move some Cash.
//
// The downside is we could have a long running flow that holds a lock for a long period of time.
// However, the lock can be programmatically released, like any other soft lock,
// should we want a long running flow that creates a visible state mid way through.
vault.rawUpdates.subscribe { update ->
update.flowId?.let {
if (update.produced.isNotEmpty()) {
registerSoftLocks(update.flowId as UUID, update.produced.map { it.ref })
trackingFlowIds.add(update.flowId as UUID)
}
}
}
}
private fun registerSoftLocks(flowId: UUID, stateRefs: List<StateRef>) {
log.trace("Reserving soft locks for flow id $flowId and states $stateRefs")
vault.softLockReserve(flowId, stateRefs.toSet())
}
private fun unregisterSoftLocks(id: StateMachineRunId, logic: FlowLogic<*>) {
val flowClassName = logic.javaClass.simpleName
log.trace("Releasing soft locks for flow $flowClassName with flow id ${id.uuid}")
vault.softLockRelease(id.uuid)
}
}

View File

@ -1,156 +0,0 @@
package net.corda.node.services
import net.corda.contracts.asset.Cash
import net.corda.contracts.testing.fillWithSomeTestCash
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.TransactionType
import net.corda.core.contracts.`issued by`
import net.corda.core.crypto.composite
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.unconsumedStates
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.LogHelper
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.io.Closeable
import java.util.*
import kotlin.test.assertEquals
class NodeVaultServiceTest {
lateinit var dataSource: Closeable
lateinit var database: Database
private val dataSourceProps = makeTestDataSourceProperties()
@Before
fun setUp() {
LogHelper.setLevel(NodeVaultService::class)
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
}
@After
fun tearDown() {
dataSource.close()
LogHelper.reset(NodeVaultService::class)
}
@Test
fun `states not local to instance`() {
databaseTransaction(database) {
val services1 = object : MockServices() {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}
}
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
val w1 = services1.vaultService.unconsumedStates<Cash.State>()
assertThat(w1).hasSize(3)
val originalStorage = services1.storageService
val originalVault = services1.vaultService
val services2 = object : MockServices() {
override val vaultService: VaultService get() = originalVault
// We need to be able to find the same transactions as before, too.
override val storageService: TxWritableStorageService get() = originalStorage
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}
}
val w2 = services2.vaultService.unconsumedStates<Cash.State>()
assertThat(w2).hasSize(3)
}
}
@Test
fun `states for refs`() {
databaseTransaction(database) {
val services1 = object : MockServices() {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}
}
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
val w1 = services1.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(w1).hasSize(3)
val stateRefs = listOf(w1[1].ref, w1[2].ref)
val states = services1.vaultService.statesForRefs(stateRefs)
assertThat(states).hasSize(2)
}
}
@Test
fun addNoteToTransaction() {
databaseTransaction(database) {
val services = object : MockServices() {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
}
val freshKey = services.legalIdentityKey
// Issue a txn to Send us some Money
val usefulTX = TransactionType.General.Builder(null).apply {
Cash().generateIssue(this, 100.DOLLARS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY)
signWith(MEGA_CORP_KEY)
}.toSignedTransaction()
services.recordTransactions(listOf(usefulTX))
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 1")
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 2")
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 3")
assertEquals(3, services.vaultService.getTransactionNotes(usefulTX.id).count())
// Issue more Money (GBP)
val anotherTX = TransactionType.General.Builder(null).apply {
Cash().generateIssue(this, 200.POUNDS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY)
signWith(MEGA_CORP_KEY)
}.toSignedTransaction()
services.recordTransactions(listOf(anotherTX))
services.vaultService.addNoteToTransaction(anotherTX.id, "GPB Sample Note 1")
assertEquals(1, services.vaultService.getTransactionNotes(anotherTX.id).count())
}
}
}

View File

@ -7,7 +7,6 @@ import net.corda.contracts.testing.fillWithSomeTestDeals
import net.corda.contracts.testing.fillWithSomeTestLinearStates
import net.corda.core.contracts.*
import net.corda.core.crypto.composite
import net.corda.core.node.recordTransactions
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.consumedStates
import net.corda.core.node.services.unconsumedStates
@ -15,6 +14,8 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.core.utilities.LogHelper
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
@ -32,6 +33,8 @@ import org.junit.Before
import org.junit.Test
import java.io.Closeable
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertNull
@ -52,7 +55,7 @@ class VaultWithCashTest {
database = dataSourceAndDatabase.second
databaseTransaction(database) {
services = object : MockServices() {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override val vaultService: VaultService = makeVaultService(dataSourceProps)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
@ -128,6 +131,98 @@ class VaultWithCashTest {
}
}
@Test
fun `issue and attempt double spend`() {
val freshKey = services.keyManagementService.freshKey()
databaseTransaction(database) {
// A tx that sends us money.
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L),
issuedBy = MEGA_CORP.ref(1),
issuerKey = MEGA_CORP_KEY,
ownedBy = freshKey.public.composite)
println("Cash balance: ${vault.cashBalances[USD]}")
assertThat(vault.unconsumedStates<Cash.State>()).hasSize(10)
assertThat(vault.softLockedStates<Cash.State>()).hasSize(0)
}
val backgroundExecutor = Executors.newFixedThreadPool(2)
val countDown = CountDownLatch(2)
// 1st tx that spends our money.
backgroundExecutor.submit {
databaseTransaction(database) {
try {
val txn1 =
TransactionType.General.Builder(DUMMY_NOTARY).apply {
vault.generateSpend(this, 60.DOLLARS, BOB_PUBKEY)
signWith(freshKey)
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
println("txn1: ${txn1.id} spent ${((txn1.tx.outputs[0].data) as Cash.State).amount}")
println("""txn1 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
services.recordTransactions(txn1)
println("txn1: Cash balance: ${vault.cashBalances[USD]}")
println("""txn1 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
txn1
}
catch(e: Exception) {
println(e)
}
}
println("txn1 COMMITTED!")
countDown.countDown()
}
// 2nd tx that attempts to spend same money
backgroundExecutor.submit {
databaseTransaction(database) {
try {
val txn2 =
TransactionType.General.Builder(DUMMY_NOTARY).apply {
vault.generateSpend(this, 80.DOLLARS, BOB_PUBKEY)
signWith(freshKey)
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
println("txn2: ${txn2.id} spent ${((txn2.tx.outputs[0].data) as Cash.State).amount}")
println("""txn2 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
services.recordTransactions(txn2)
println("txn2: Cash balance: ${vault.cashBalances[USD]}")
println("""txn2 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
txn2
}
catch(e: Exception) {
println(e)
}
}
println("txn2 COMMITTED!")
countDown.countDown()
}
countDown.await()
databaseTransaction(database) {
println("Cash balance: ${vault.cashBalances[USD]}")
assertThat(vault.cashBalances[USD]).isIn(DOLLARS(20),DOLLARS(40))
}
}
@Test
fun `branching LinearStates fails to verify`() {
databaseTransaction(database) {

View File

@ -0,0 +1,393 @@
package net.corda.node.services.vault
import net.corda.contracts.asset.Cash
import net.corda.contracts.testing.fillWithSomeTestCash
import net.corda.core.contracts.*
import net.corda.core.crypto.composite
import net.corda.core.flows.FlowException
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.unconsumedStates
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.LogHelper
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.io.Closeable
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertNull
class NodeVaultServiceTest {
lateinit var services: MockServices
val vault: VaultService get() = services.vaultService
lateinit var dataSource: Closeable
lateinit var database: Database
@Before
fun setUp() {
LogHelper.setLevel(NodeVaultService::class)
val dataSourceProps = makeTestDataSourceProperties()
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
databaseTransaction(database) {
services = object : MockServices() {
override val vaultService: VaultService = makeVaultService(dataSourceProps)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
}
}
}
@After
fun tearDown() {
dataSource.close()
LogHelper.reset(NodeVaultService::class)
}
@Test
fun `states not local to instance`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
val w1 = services.vaultService.unconsumedStates<Cash.State>()
assertThat(w1).hasSize(3)
val originalStorage = services.storageService
val originalVault = services.vaultService
val services2 = object : MockServices() {
override val vaultService: VaultService get() = originalVault
// We need to be able to find the same transactions as before, too.
override val storageService: TxWritableStorageService get() = originalStorage
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
storageService.validatedTransactions.addTransaction(stx)
vaultService.notify(stx.tx)
}
}
}
val w2 = services2.vaultService.unconsumedStates<Cash.State>()
assertThat(w2).hasSize(3)
}
}
@Test
fun `states for refs`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
val w1 = services.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(w1).hasSize(3)
val stateRefs = listOf(w1[1].ref, w1[2].ref)
val states = services.vaultService.statesForRefs(stateRefs)
assertThat(states).hasSize(2)
}
}
@Test
fun `states soft locking reserve and release`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
val unconsumedStates = services.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(unconsumedStates).hasSize(3)
val stateRefsToSoftLock = setOf(unconsumedStates[1].ref, unconsumedStates[2].ref)
// soft lock two of the three states
val softLockId = UUID.randomUUID()
services.vaultService.softLockReserve(softLockId, stateRefsToSoftLock)
// all softlocked states
assertThat(services.vaultService.softLockedStates<Cash.State>()).hasSize(2)
// my softlocked states
assertThat(services.vaultService.softLockedStates<Cash.State>(softLockId)).hasSize(2)
// excluding softlocked states
val unlockedStates1 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false)
assertThat(unlockedStates1).hasSize(1)
// soft lock release one of the states explicitly
services.vaultService.softLockRelease(softLockId, setOf(unconsumedStates[1].ref))
val unlockedStates2 = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false)
assertThat(unlockedStates2).hasSize(2)
// soft lock release the rest by id
services.vaultService.softLockRelease(softLockId)
val unlockedStates = services.vaultService.unconsumedStates<Cash.State>(includeSoftLockedStates = false).toList()
assertThat(unlockedStates).hasSize(3)
// should be back to original states
assertThat(unlockedStates).isEqualTo(unconsumedStates)
}
}
@Test
fun `soft locking attempt concurrent reserve`() {
val backgroundExecutor = Executors.newFixedThreadPool(2)
val countDown = CountDownLatch(2)
val softLockId1 = UUID.randomUUID()
val softLockId2 = UUID.randomUUID()
val vaultStates =
databaseTransaction(database) {
assertNull(vault.cashBalances[USD])
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
println("State Refs:: $stateRefsToSoftLock")
// 1st tx locks states
backgroundExecutor.submit {
try {
databaseTransaction(database) {
vault.softLockReserve(softLockId1, stateRefsToSoftLock)
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(3)
}
println("SOFT LOCK STATES #1 succeeded")
} catch(e: Throwable) {
println("SOFT LOCK STATES #1 failed")
} finally {
countDown.countDown()
}
}
// 2nd tx attempts to lock same states
backgroundExecutor.submit {
try {
Thread.sleep(100) // let 1st thread soft lock them 1st
databaseTransaction(database) {
vault.softLockReserve(softLockId2, stateRefsToSoftLock)
assertThat(vault.softLockedStates<Cash.State>(softLockId2)).hasSize(3)
}
println("SOFT LOCK STATES #2 succeeded")
} catch(e: Throwable) {
println("SOFT LOCK STATES #2 failed")
} finally {
countDown.countDown()
}
}
countDown.await()
databaseTransaction(database) {
val lockStatesId1 = vault.softLockedStates<Cash.State>(softLockId1)
println("SOFT LOCK #1 final states: $lockStatesId1")
assertThat(lockStatesId1.size).isIn(0, 3)
val lockStatesId2 = vault.softLockedStates<Cash.State>(softLockId2)
println("SOFT LOCK #2 final states: $lockStatesId2")
assertThat(lockStatesId2.size).isIn(0, 3)
}
}
@Test
fun `soft locking partial reserve states fails`() {
val softLockId1 = UUID.randomUUID()
val softLockId2 = UUID.randomUUID()
val vaultStates =
databaseTransaction(database) {
assertNull(vault.cashBalances[USD])
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
println("State Refs:: $stateRefsToSoftLock")
// lock 1st state with LockId1
databaseTransaction(database) {
vault.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first()))
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(1)
}
// attempt to lock all 3 states with LockId2
databaseTransaction(database) {
assertThatExceptionOfType(FlowException::class.java).isThrownBy(
{ vault.softLockReserve(softLockId2, stateRefsToSoftLock) }
).withMessageContaining("only 2 rows available").withNoCause()
}
}
@Test
fun `attempt to lock states already soft locked by me`() {
val softLockId1 = UUID.randomUUID()
val vaultStates =
databaseTransaction(database) {
assertNull(vault.cashBalances[USD])
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
println("State Refs:: $stateRefsToSoftLock")
// lock states with LockId1
databaseTransaction(database) {
vault.softLockReserve(softLockId1, stateRefsToSoftLock)
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(3)
}
// attempt to relock same states with LockId1
databaseTransaction(database) {
vault.softLockReserve(softLockId1, stateRefsToSoftLock)
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(3)
}
}
@Test
fun `lock additional states to some already soft locked by me`() {
val softLockId1 = UUID.randomUUID()
val vaultStates =
databaseTransaction(database) {
assertNull(vault.cashBalances[USD])
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
println("State Refs:: $stateRefsToSoftLock")
// lock states with LockId1
databaseTransaction(database) {
vault.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first()))
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(1)
}
// attempt to lock all states with LockId1 (including previously already locked one)
databaseTransaction(database) {
vault.softLockReserve(softLockId1, stateRefsToSoftLock)
assertThat(vault.softLockedStates<Cash.State>(softLockId1)).hasSize(3)
}
}
@Test
fun `unconsumedStatesForSpending exact amount`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 1, 1, Random(0L))
val unconsumedStates = services.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(unconsumedStates).hasSize(1)
val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending<Cash.State>(100.DOLLARS, lockId = UUID.randomUUID())
spendableStatesUSD.forEach(::println)
assertThat(spendableStatesUSD).hasSize(1)
assertThat(spendableStatesUSD[0].state.data.amount.quantity).isEqualTo(100L*100)
assertThat(services.vaultService.softLockedStates<Cash.State>()).hasSize(1)
}
}
@Test
fun `unconsumedStatesForSpending insufficient amount`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 1, 1, Random(0L))
val unconsumedStates = services.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(unconsumedStates).hasSize(1)
val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending<Cash.State>(110.DOLLARS, lockId = UUID.randomUUID())
spendableStatesUSD.forEach(::println)
assertThat(spendableStatesUSD).hasSize(1)
assertThat(services.vaultService.softLockedStates<Cash.State>()).hasSize(0)
}
}
@Test
fun `unconsumedStatesForSpending small amount`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L))
val unconsumedStates = services.vaultService.unconsumedStates<Cash.State>().toList()
assertThat(unconsumedStates).hasSize(2)
val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending<Cash.State>(1.DOLLARS, lockId = UUID.randomUUID())
spendableStatesUSD.forEach(::println)
assertThat(spendableStatesUSD).hasSize(1)
assertThat(spendableStatesUSD[0].state.data.amount.quantity).isGreaterThanOrEqualTo(1L*100)
assertThat(services.vaultService.softLockedStates<Cash.State>()).hasSize(1)
}
}
@Test
fun `states soft locking query granularity`() {
databaseTransaction(database) {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L))
services.fillWithSomeTestCash(100.POUNDS, DUMMY_NOTARY, 10, 10, Random(0L))
services.fillWithSomeTestCash(100.SWISS_FRANCS, DUMMY_NOTARY, 10, 10, Random(0L))
val allStates = services.vaultService.unconsumedStates<Cash.State>()
assertThat(allStates).hasSize(30)
for (i in 1..5) {
val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending<Cash.State>(20.DOLLARS, lockId = UUID.randomUUID())
spendableStatesUSD.forEach(::println)
}
// note only 3 spend attempts succeed with a total of 8 states
assertThat(services.vaultService.softLockedStates<Cash.State>()).hasSize(8)
}
}
@Test
fun addNoteToTransaction() {
databaseTransaction(database) {
val freshKey = services.legalIdentityKey
// Issue a txn to Send us some Money
val usefulTX = TransactionType.General.Builder(null).apply {
Cash().generateIssue(this, 100.DOLLARS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY)
signWith(MEGA_CORP_KEY)
}.toSignedTransaction()
services.recordTransactions(listOf(usefulTX))
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 1")
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 2")
services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 3")
assertEquals(3, services.vaultService.getTransactionNotes(usefulTX.id).count())
// Issue more Money (GBP)
val anotherTX = TransactionType.General.Builder(null).apply {
Cash().generateIssue(this, 200.POUNDS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY)
signWith(MEGA_CORP_KEY)
}.toSignedTransaction()
services.recordTransactions(listOf(anotherTX))
services.vaultService.addNoteToTransaction(anotherTX.id, "GPB Sample Note 1")
assertEquals(1, services.vaultService.getTransactionNotes(anotherTX.id).count())
}
}
}