mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Initial implementation of Vault Persistence using Requery (#191)
* Initial prototyping with Requery as a persistence replacement for Exposed/Hibernate Applied changes following PR review by RP Updated timestamp naming (removed committedTimestamp) and StateStatus (removed AWAITING_CONSENSUS) after discussion with RP. Removed FungibleState and LinearState schemas (and associated tests) - awaiting Requery uni-directional relationship fix. Added Transaction propagation such that requery re-uses any existing transaction context. Made requery default logging configurable (disabled by default) Nullable fields are now truly nullable (in the Kotlin and DDL sense) Fix for SimmValuation integration test. Workarounds applied to resolve Requery issues when sharing Transactional context. Addressed PR review comments from MH. Further updates following re-review by RP/MH Further updates following additional PR review comments by RP Minor update following additional PR review comments by RP Optimised makeUpdate state processing code. Resolved conflicts after rebase. Additional Unit tests and bug fix for correct spending of multiple contract state types within a single transaction. Required interface change to states() API to take a setOf (ContractStateClassTypes) Minor code clean-up. Re-write NodeVaultService consumed state makeUpdate function using SQL. * Resolve conflict after rebase from master
This commit is contained in:
@ -203,7 +203,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
netMapCache = InMemoryNetworkMapCache()
|
||||
net = makeMessagingService()
|
||||
schemas = makeSchemaService()
|
||||
vault = makeVaultService()
|
||||
vault = makeVaultService(configuration.dataSourceProperties)
|
||||
|
||||
info = makeInfo()
|
||||
identity = makeIdentityService()
|
||||
@ -452,7 +452,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeVaultService(): VaultService = NodeVaultService(services)
|
||||
protected open fun makeVaultService(dataSourceProperties: Properties): VaultService = NodeVaultService(services, dataSourceProperties)
|
||||
|
||||
protected open fun makeSchemaService(): SchemaService = NodeSchemaService()
|
||||
|
||||
|
@ -48,7 +48,7 @@ class CordaRPCOpsImpl(
|
||||
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
|
||||
return databaseTransaction(database) {
|
||||
val (vault, updates) = services.vaultService.track()
|
||||
Pair(vault.states, updates)
|
||||
Pair(vault.states.toList(), updates)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,149 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import io.requery.EntityCache
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.TransactionListener
|
||||
import io.requery.cache.WeakEntityCache
|
||||
import io.requery.meta.EntityModel
|
||||
import io.requery.sql.*
|
||||
import io.requery.sql.platform.H2
|
||||
import io.requery.util.function.Function
|
||||
import io.requery.util.function.Supplier
|
||||
import net.corda.core.schemas.requery.converters.InstantConverter
|
||||
import net.corda.core.schemas.requery.converters.StateRefConverter
|
||||
import net.corda.core.schemas.requery.converters.VaultStateStatusConverter
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import javax.sql.DataSource
|
||||
|
||||
/**
|
||||
* Requery KotlinConfiguration wrapper class to enable us to pass in an existing database connection and
|
||||
* associated transaction context.
|
||||
*/
|
||||
class KotlinConfigurationTransactionWrapper(private val model: EntityModel,
|
||||
dataSource: DataSource,
|
||||
private val mapping: Mapping? = null,
|
||||
private val platform: Platform? = null,
|
||||
private val cache: EntityCache = WeakEntityCache(),
|
||||
private val useDefaultLogging: Boolean = false,
|
||||
private val statementCacheSize: Int = 0,
|
||||
private val batchUpdateSize: Int = 64,
|
||||
private val quoteTableNames: Boolean = false,
|
||||
private val quoteColumnNames: Boolean = false,
|
||||
private val tableTransformer: Function<String, String>? = null,
|
||||
private val columnTransformer: Function<String, String>? = null,
|
||||
private val transactionMode: TransactionMode = TransactionMode.NONE,
|
||||
private val transactionIsolation: TransactionIsolation? = null,
|
||||
private val statementListeners: Set<StatementListener> = LinkedHashSet(),
|
||||
private val entityStateListeners: Set<EntityStateListener<Any>> = LinkedHashSet(),
|
||||
private val transactionListeners: Set<Supplier<TransactionListener>> = LinkedHashSet(),
|
||||
private val writeExecutor: Executor? = null) : Configuration {
|
||||
|
||||
private val connectionProvider = CordaDataSourceConnectionProvider(dataSource)
|
||||
|
||||
override fun getBatchUpdateSize(): Int {
|
||||
return batchUpdateSize
|
||||
}
|
||||
|
||||
override fun getConnectionProvider(): ConnectionProvider? {
|
||||
return connectionProvider
|
||||
}
|
||||
|
||||
override fun getCache(): EntityCache? {
|
||||
return cache
|
||||
}
|
||||
|
||||
override fun getEntityStateListeners(): Set<EntityStateListener<Any>> {
|
||||
return entityStateListeners
|
||||
}
|
||||
|
||||
override fun getMapping(): Mapping? {
|
||||
// TODO: database platform provider to become configurable and parameterised into this configuration
|
||||
val customMapping = GenericMapping(H2())
|
||||
|
||||
// register our custom converters
|
||||
val instantConverter = InstantConverter()
|
||||
customMapping.addConverter(instantConverter, instantConverter.mappedType)
|
||||
val vaultStateStatusConverter = VaultStateStatusConverter()
|
||||
customMapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType)
|
||||
customMapping.addConverter(StateRefConverter(), StateRefConverter::getMappedType.javaClass)
|
||||
return customMapping
|
||||
}
|
||||
|
||||
override fun getModel(): EntityModel {
|
||||
return model
|
||||
}
|
||||
|
||||
override fun getPlatform(): Platform? {
|
||||
return platform
|
||||
}
|
||||
|
||||
override fun getQuoteTableNames(): Boolean {
|
||||
return quoteTableNames
|
||||
}
|
||||
|
||||
override fun getQuoteColumnNames(): Boolean {
|
||||
return quoteColumnNames
|
||||
}
|
||||
|
||||
override fun getTableTransformer(): Function<String, String>? {
|
||||
return tableTransformer
|
||||
}
|
||||
|
||||
override fun getColumnTransformer(): Function<String, String>? {
|
||||
return columnTransformer
|
||||
}
|
||||
|
||||
override fun getStatementCacheSize(): Int {
|
||||
return statementCacheSize
|
||||
}
|
||||
|
||||
override fun getStatementListeners(): Set<StatementListener>? {
|
||||
return statementListeners
|
||||
}
|
||||
|
||||
override fun getTransactionMode(): TransactionMode? {
|
||||
return transactionMode
|
||||
}
|
||||
|
||||
override fun getTransactionIsolation(): TransactionIsolation? {
|
||||
return transactionIsolation
|
||||
}
|
||||
|
||||
override fun getTransactionListenerFactories(): Set<Supplier<TransactionListener>>? {
|
||||
return transactionListeners
|
||||
}
|
||||
|
||||
override fun getUseDefaultLogging(): Boolean {
|
||||
return useDefaultLogging
|
||||
}
|
||||
|
||||
override fun getWriteExecutor(): Executor? {
|
||||
return writeExecutor
|
||||
}
|
||||
|
||||
class CordaDataSourceConnectionProvider(val dataSource: DataSource) : ConnectionProvider {
|
||||
override fun getConnection(): Connection {
|
||||
val tx = TransactionManager.manager.currentOrNull()
|
||||
return CordaConnection(
|
||||
tx?.connection ?:
|
||||
TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class CordaConnection(val connection: Connection) : Connection by connection {
|
||||
override fun close() {
|
||||
// TODO: address requery auto-closing the connection in SchemaModifier upon table creation
|
||||
// https://github.com/requery/requery/issues/424
|
||||
}
|
||||
|
||||
override fun setAutoCommit(autoCommit: Boolean) {
|
||||
// TODO: address requery bug in ConnectionTransaction commit()
|
||||
// https://github.com/requery/requery/issues/423
|
||||
connection.autoCommit = false
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import io.requery.Persistable
|
||||
import io.requery.meta.EntityModel
|
||||
import io.requery.sql.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class RequeryConfiguration(val properties: Properties, val useDefaultLogging: Boolean = false) {
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<RequeryConfiguration>()
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// 1. schemaService schemaOptions needs to be applied: eg. default schema, table prefix
|
||||
// 2. set other generic database configuration options: show_sql, format_sql
|
||||
// 3. Configure Requery Database platform specific features (see http://requery.github.io/javadoc/io/requery/sql/Platform.html)
|
||||
// 4. Configure Cache Manager and Cache Provider and set in Requery Configuration (see http://requery.github.io/javadoc/io/requery/EntityCache.html)
|
||||
// 5. Consider database schema deployment/upgrade strategies to replace dynamic table creation.
|
||||
|
||||
// Note: Annotations are pre-processed using (kapt) so no need to register dynamically
|
||||
val config = HikariConfig(properties)
|
||||
val dataSource = HikariDataSource(config)
|
||||
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
private val sessionFactories = ConcurrentHashMap<EntityModel, KotlinEntityDataStore<Persistable>>()
|
||||
|
||||
fun sessionForModel(model: EntityModel): KotlinEntityDataStore<Persistable> {
|
||||
return sessionFactories.computeIfAbsent(model, { makeSessionFactoryForModel(it) })
|
||||
}
|
||||
|
||||
fun makeSessionFactoryForModel(model: EntityModel): KotlinEntityDataStore<Persistable> {
|
||||
val configuration = KotlinConfigurationTransactionWrapper(model, dataSource, useDefaultLogging = this.useDefaultLogging)
|
||||
val tables = SchemaModifier(configuration)
|
||||
val mode = TableCreationMode.DROP_CREATE
|
||||
tables.createTables(mode)
|
||||
return KotlinEntityDataStore(configuration)
|
||||
}
|
||||
|
||||
// TODO: remove once Requery supports QUERY WITH COMPOSITE_KEY IN
|
||||
fun jdbcSession(): Connection {
|
||||
val ctx = TransactionManager.manager.currentOrNull()
|
||||
return ctx?.connection ?: TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import com.google.common.collect.Sets
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.kotlin.`in`
|
||||
import io.requery.kotlin.eq
|
||||
import net.corda.contracts.asset.Cash
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
@ -11,16 +13,20 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.createKryo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.tee
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.select
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.vault.schemas.*
|
||||
import net.corda.node.utilities.bufferUntilDatabaseCommit
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
@ -37,66 +43,16 @@ import java.util.*
|
||||
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsToken(), VaultService {
|
||||
class NodeVaultService(private val services: ServiceHub, dataSourceProperties: Properties) : SingletonSerializeAsToken(), VaultService {
|
||||
|
||||
private companion object {
|
||||
val log = loggerFor<NodeVaultService>()
|
||||
}
|
||||
|
||||
private object StatesSetTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_unconsumed_states") {
|
||||
val stateRef = stateRef("transaction_id", "output_index")
|
||||
}
|
||||
|
||||
private data class TxnNote(val txnId: SecureHash, val note: String) {
|
||||
override fun toString() = "$txnId: $note"
|
||||
}
|
||||
|
||||
private object CashBalanceTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_cash_balances") {
|
||||
val currency = varchar("currency", 3)
|
||||
val amount = long("amount")
|
||||
}
|
||||
|
||||
private object TransactionNotesTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_txn_notes") {
|
||||
val txnId = secureHash("txnId").index()
|
||||
val note = text("note")
|
||||
}
|
||||
val configuration = RequeryConfiguration(dataSourceProperties)
|
||||
val session = configuration.sessionForModel(Models.VAULT)
|
||||
|
||||
private val mutex = ThreadBox(content = object {
|
||||
val unconsumedStates = object : AbstractJDBCHashSet<StateRef, StatesSetTable>(StatesSetTable) {
|
||||
override fun elementFromRow(row: ResultRow): StateRef = StateRef(row[table.stateRef.txId], row[table.stateRef.index])
|
||||
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.stateRef.txId] = entry.txhash
|
||||
insert[table.stateRef.index] = entry.index
|
||||
}
|
||||
}
|
||||
|
||||
val transactionNotes = object : AbstractJDBCHashSet<TxnNote, TransactionNotesTable>(TransactionNotesTable) {
|
||||
override fun elementFromRow(row: ResultRow): TxnNote = TxnNote(row[table.txnId], row[table.note])
|
||||
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: TxnNote, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.txnId] = entry.txnId
|
||||
insert[table.note] = entry.note
|
||||
}
|
||||
|
||||
// TODO: caching (2nd tier db cache) and db results filtering (max records, date, other)
|
||||
fun select(txnId: SecureHash): Iterable<String> {
|
||||
return table.select { table.txnId.eq(txnId) }.map { row -> row[table.note] }.toSet().asIterable()
|
||||
}
|
||||
}
|
||||
|
||||
val cashBalances = object : AbstractJDBCHashMap<Currency, Amount<Currency>, CashBalanceTable>(CashBalanceTable) {
|
||||
override fun keyFromRow(row: ResultRow): Currency = Currency.getInstance(row[table.currency])
|
||||
override fun valueFromRow(row: ResultRow): Amount<Currency> = Amount(row[table.amount], keyFromRow(row))
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<Currency, Amount<Currency>>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.currency] = entry.key.currencyCode
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<Currency, Amount<Currency>>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.amount] = entry.value.quantity
|
||||
}
|
||||
}
|
||||
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
@ -105,22 +61,39 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
// For use during publishing only.
|
||||
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
|
||||
|
||||
fun allUnconsumedStates(): List<StateAndRef<ContractState>> {
|
||||
// Ideally we'd map this transform onto a sequence, but we can't have a lazy list here, since accessing it
|
||||
// from a flow might end up trying to serialize the captured context - vault internal state or db context.
|
||||
return unconsumedStates.map {
|
||||
val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.")
|
||||
StateAndRef(storedTx.tx.outputs[it.index], it)
|
||||
}
|
||||
}
|
||||
|
||||
fun recordUpdate(update: Vault.Update): Vault.Update {
|
||||
if (update != Vault.NoUpdate) {
|
||||
val producedStateRefs = update.produced.map { it.ref }
|
||||
val producedStateRefsMap = update.produced.associateBy { it.ref }
|
||||
val consumedStateRefs = update.consumed.map { it.ref }
|
||||
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
|
||||
unconsumedStates.removeAll(consumedStateRefs)
|
||||
unconsumedStates.addAll(producedStateRefs)
|
||||
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
producedStateRefsMap.forEach { it ->
|
||||
val state = VaultStatesEntity().apply {
|
||||
txId = it.key.txhash.toString()
|
||||
index = it.key.index
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = it.value.state.data.javaClass.name
|
||||
// TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO
|
||||
contractState = it.value.state.serialize(createKryo()).bytes
|
||||
notaryName = it.value.state.notary.name
|
||||
notaryKey = it.value.state.notary.owningKey.toBase58String()
|
||||
recordedTime = services.clock.instant()
|
||||
}
|
||||
insert(state)
|
||||
}
|
||||
consumedStateRefs.forEach { stateRef ->
|
||||
val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(),
|
||||
VaultStatesEntity.INDEX to stateRef.index))
|
||||
val state = findByKey(VaultStatesEntity::class, queryKey)
|
||||
state?.run {
|
||||
stateStatus = Vault.StateStatus.CONSUMED
|
||||
consumedTime = services.clock.instant()
|
||||
update(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return update
|
||||
}
|
||||
@ -133,8 +106,18 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
(produced.keys + consumed.keys).map { currency ->
|
||||
val producedAmount = produced[currency] ?: Amount(0, currency)
|
||||
val consumedAmount = consumed[currency] ?: Amount(0, currency)
|
||||
val currentBalance = cashBalances[currency] ?: Amount(0, currency)
|
||||
cashBalances[currency] = currentBalance + producedAmount - consumedAmount
|
||||
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = currency.currencyCode
|
||||
cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity
|
||||
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode)
|
||||
state?.run {
|
||||
amount += producedAmount.quantity - consumedAmount.quantity
|
||||
}
|
||||
upsert(state ?: cashBalanceEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -147,9 +130,15 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
}
|
||||
})
|
||||
|
||||
override val cashBalances: Map<Currency, Amount<Currency>> get() = mutex.locked { HashMap(cashBalances) }
|
||||
|
||||
override val currentVault: Vault get() = mutex.locked { Vault(allUnconsumedStates()) }
|
||||
override val cashBalances: Map<Currency, Amount<Currency>> get() {
|
||||
val cashBalancesByCurrency =
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val balances = select(VaultSchema.VaultCashBalances::class)
|
||||
balances.get().toList()
|
||||
}
|
||||
return cashBalancesByCurrency.associateBy({ Currency.getInstance(it.currency) },
|
||||
{ Amount(it.amount, Currency.getInstance(it.currency)) })
|
||||
}
|
||||
|
||||
override val rawUpdates: Observable<Vault.Update>
|
||||
get() = mutex.locked { _rawUpdatesPublisher }
|
||||
@ -157,23 +146,55 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
override val updates: Observable<Vault.Update>
|
||||
get() = mutex.locked { _updatesInDbTx }
|
||||
|
||||
override fun track(): Pair<Vault, Observable<Vault.Update>> {
|
||||
override fun track(): Pair<Vault<ContractState>, Observable<Vault.Update>> {
|
||||
return mutex.locked {
|
||||
Pair(Vault(allUnconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
Pair(Vault(unconsumedStates<ContractState>()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot of the heads of LinearStates.
|
||||
*
|
||||
* TODO: Represent this using an actual JDBCHashMap or look at vault design further.
|
||||
*/
|
||||
override val linearHeads: Map<UniqueIdentifier, StateAndRef<LinearState>>
|
||||
get() = currentVault.states.filterStatesOfType<LinearState>().associateBy { it.state.data.linearId }.mapValues { it.value }
|
||||
override fun <T: ContractState> states(clazzes: Set<Class<T>>, statuses: EnumSet<Vault.StateStatus>): List<StateAndRef<T>> {
|
||||
val stateAndRefs =
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
var result = select(VaultSchema.VaultStates::class)
|
||||
.where(VaultSchema.VaultStates::stateStatus `in` statuses)
|
||||
// TODO: temporary fix to continue supporting track() function (until becomes Typed)
|
||||
if (!clazzes.map {it.name}.contains(ContractState::class.java.name))
|
||||
result.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name }))
|
||||
result.get()
|
||||
.map { it ->
|
||||
val stateRef = StateRef(SecureHash.parse(it.txId), it.index)
|
||||
// TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO
|
||||
val state = it.contractState.deserialize<TransactionState<T>>(createKryo())
|
||||
StateAndRef(state, stateRef)
|
||||
}.toList()
|
||||
}
|
||||
return stateAndRefs
|
||||
}
|
||||
|
||||
override fun statesForRefs(refs: List<StateRef>): Map<StateRef, TransactionState<*>?> {
|
||||
val stateAndRefs =
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
var results: List<StateAndRef<*>> = emptyList()
|
||||
refs.forEach {
|
||||
val result = select(VaultSchema.VaultStates::class)
|
||||
.where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultSchema.VaultStates::txId eq it.txhash.toString())
|
||||
.and(VaultSchema.VaultStates::index eq it.index)
|
||||
result.get()?.each {
|
||||
val stateRef = StateRef(SecureHash.parse(it.txId), it.index)
|
||||
val state = it.contractState.deserialize<TransactionState<*>>()
|
||||
results += StateAndRef(state, stateRef)
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
return stateAndRefs.associateBy({ it.ref }, { it.state })
|
||||
}
|
||||
|
||||
override fun notifyAll(txns: Iterable<WireTransaction>) {
|
||||
val ourKeys = services.keyManagementService.keys.keys
|
||||
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, netDelta, ourKeys) }
|
||||
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) }
|
||||
if (netDelta != Vault.NoUpdate) {
|
||||
mutex.locked {
|
||||
recordUpdate(netDelta)
|
||||
@ -184,14 +205,17 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
}
|
||||
|
||||
override fun addNoteToTransaction(txnId: SecureHash, noteText: String) {
|
||||
mutex.locked {
|
||||
transactionNotes.add(TxnNote(txnId, noteText))
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val txnNoteEntity = VaultTxnNoteEntity()
|
||||
txnNoteEntity.txId = txnId.toString()
|
||||
txnNoteEntity.note = noteText
|
||||
insert(txnNoteEntity)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionNotes(txnId: SecureHash): Iterable<String> {
|
||||
mutex.locked {
|
||||
return transactionNotes.select(txnId)
|
||||
return session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
(select(VaultSchema.VaultTxnNote::class) where (VaultSchema.VaultTxnNote::txId eq txnId.toString())).get().asIterable().map { it.note }
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +243,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
//
|
||||
// Finally, we add the states to the provided partial transaction.
|
||||
|
||||
val assetsStates = currentVault.statesOfType<Cash.State>()
|
||||
val assetsStates = unconsumedStates<Cash.State>()
|
||||
|
||||
val currency = amount.token
|
||||
var acceptableCoins = run {
|
||||
@ -308,32 +332,36 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
return Pair(gathered, gatheredAmount)
|
||||
}
|
||||
|
||||
private fun makeUpdate(tx: WireTransaction, netDelta: Vault.Update, ourKeys: Set<PublicKey>): Vault.Update {
|
||||
private fun makeUpdate(tx: WireTransaction, ourKeys: Set<PublicKey>): Vault.Update {
|
||||
val ourNewStates = tx.outputs.
|
||||
filter { isRelevant(it.data, ourKeys) }.
|
||||
map { tx.outRef<ContractState>(it.data) }
|
||||
|
||||
// Now calculate the states that are being spent by this transaction.
|
||||
val consumedRefs = tx.inputs.toHashSet()
|
||||
// We use Guava union here as it's lazy for contains() which is how retainAll() is implemented.
|
||||
// i.e. retainAll() iterates over consumed, checking contains() on the parameter. Sets.union() does not physically create
|
||||
// a new collection and instead contains() just checks the contains() of both parameters, and so we don't end up
|
||||
// iterating over all (a potentially very large) unconsumedStates at any point.
|
||||
mutex.locked {
|
||||
consumedRefs.retainAll(Sets.union(netDelta.produced, unconsumedStates))
|
||||
// Retrieve all unconsumed states for this transaction's inputs
|
||||
val consumedStates = HashSet<StateAndRef<ContractState>>()
|
||||
if (tx.inputs.isNotEmpty()) {
|
||||
val stateRefs = tx.inputs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1)
|
||||
// TODO: using native JDBC until requery supports SELECT WHERE COMPOSITE_KEY IN
|
||||
// https://github.com/requery/requery/issues/434
|
||||
val statement = configuration.jdbcSession().createStatement()
|
||||
val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " +
|
||||
"FROM vault_states " +
|
||||
"WHERE ((transaction_id, output_index) IN ($stateRefs)) " +
|
||||
"AND (state_status = 0)")
|
||||
while (rs.next()) {
|
||||
val txHash = SecureHash.parse(rs.getString(1))
|
||||
val index = rs.getInt(2)
|
||||
val state = rs.getBytes(3).deserialize<TransactionState<ContractState>>(createKryo())
|
||||
consumedStates.add(StateAndRef(state, StateRef(txHash, index)))
|
||||
}
|
||||
}
|
||||
|
||||
// Is transaction irrelevant?
|
||||
if (consumedRefs.isEmpty() && ourNewStates.isEmpty()) {
|
||||
if (consumedStates.isEmpty() && ourNewStates.isEmpty()) {
|
||||
log.trace { "tx ${tx.id} was irrelevant to this vault, ignoring" }
|
||||
return Vault.NoUpdate
|
||||
}
|
||||
|
||||
val consumedStates = consumedRefs.map {
|
||||
val state = services.loadState(it)
|
||||
StateAndRef(state, it)
|
||||
}.toSet()
|
||||
|
||||
return Vault.Update(consumedStates, ourNewStates.toHashSet())
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.flows.CashIssueFlow
|
||||
@ -67,7 +68,7 @@ class CordaRPCOpsImplTest {
|
||||
|
||||
// Check the monitoring service wallet is empty
|
||||
databaseTransaction(aliceNode.database) {
|
||||
assertFalse(aliceNode.services.vaultService.currentVault.states.iterator().hasNext())
|
||||
assertFalse(aliceNode.services.vaultService.unconsumedStates<ContractState>().iterator().hasNext())
|
||||
}
|
||||
|
||||
// Tell the monitoring service node to issue some cash
|
||||
|
@ -488,7 +488,7 @@ class TwoPartyTradeFlowTests {
|
||||
withError: Boolean,
|
||||
owner: CompositeKey,
|
||||
issuer: AnonymousParty,
|
||||
notary: Party): Pair<Vault, List<WireTransaction>> {
|
||||
notary: Party): Pair<Vault<ContractState>, List<WireTransaction>> {
|
||||
val interimOwnerKey = MEGA_CORP_PUBKEY
|
||||
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
|
||||
// wants to sell to Bob.
|
||||
@ -526,7 +526,7 @@ class TwoPartyTradeFlowTests {
|
||||
this.verifies()
|
||||
}
|
||||
|
||||
val vault = Vault(listOf("bob cash 1".outputStateAndRef(), "bob cash 2".outputStateAndRef()))
|
||||
val vault = Vault<ContractState>(listOf("bob cash 1".outputStateAndRef(), "bob cash 2".outputStateAndRef()))
|
||||
return Pair(vault, listOf(eb1, bc1, bc2))
|
||||
}
|
||||
|
||||
@ -535,7 +535,7 @@ class TwoPartyTradeFlowTests {
|
||||
owner: CompositeKey,
|
||||
amount: Amount<Issued<Currency>>,
|
||||
attachmentID: SecureHash?,
|
||||
notary: Party): Pair<Vault, List<WireTransaction>> {
|
||||
notary: Party): Pair<Vault<ContractState>, List<WireTransaction>> {
|
||||
val ap = transaction(transactionBuilder = TransactionBuilder(notary = notary)) {
|
||||
output("alice's paper", notary = notary) {
|
||||
CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), owner, amount, TEST_TX_TIME + 7.days)
|
||||
@ -552,7 +552,7 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
}
|
||||
|
||||
val vault = Vault(listOf("alice's paper".outputStateAndRef()))
|
||||
val vault = Vault<ContractState>(listOf("alice's paper".outputStateAndRef()))
|
||||
return Pair(vault, listOf(ap))
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.persistence.DataVending
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.testing.MOCK_IDENTITY_SERVICE
|
||||
import net.corda.testing.node.MockNetworkMapCache
|
||||
import net.corda.testing.node.MockStorageService
|
||||
@ -24,9 +23,8 @@ import java.time.Clock
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
open class MockServiceHubInternal(
|
||||
customVault: VaultService? = null,
|
||||
val customVault: VaultService? = null,
|
||||
val keyManagement: KeyManagementService? = null,
|
||||
val net: MessagingServiceInternal? = null,
|
||||
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
|
||||
@ -37,7 +35,8 @@ open class MockServiceHubInternal(
|
||||
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
|
||||
val schemas: SchemaService? = NodeSchemaService()
|
||||
) : ServiceHubInternal() {
|
||||
override val vaultService: VaultService = customVault ?: NodeVaultService(this)
|
||||
override val vaultService: VaultService
|
||||
get() = customVault ?: throw UnsupportedOperationException()
|
||||
override val keyManagementService: KeyManagementService
|
||||
get() = keyManagement ?: throw UnsupportedOperationException()
|
||||
override val identityService: IdentityService
|
||||
|
@ -9,8 +9,10 @@ import net.corda.core.flows.FlowLogicRef
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.recordTransactions
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
@ -74,7 +76,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
countDown = CountDownLatch(1)
|
||||
smmHasRemovedAllFlows = CountDownLatch(1)
|
||||
calls = 0
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
|
||||
@ -82,6 +85,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
val kms = MockKeyManagementService(ALICE_KEY)
|
||||
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.PeerHandle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
|
||||
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
|
||||
|
@ -9,6 +9,7 @@ import net.corda.core.contracts.`issued by`
|
||||
import net.corda.core.crypto.composite
|
||||
import net.corda.core.node.services.TxWritableStorageService
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.LogHelper
|
||||
@ -31,11 +32,12 @@ import kotlin.test.assertEquals
|
||||
class NodeVaultServiceTest {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
private val dataSourceProps = makeTestDataSourceProperties()
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
}
|
||||
@ -50,7 +52,7 @@ class NodeVaultServiceTest {
|
||||
fun `states not local to instance`() {
|
||||
databaseTransaction(database) {
|
||||
val services1 = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
@ -61,12 +63,13 @@ class NodeVaultServiceTest {
|
||||
}
|
||||
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w1 = services1.vaultService.currentVault
|
||||
assertThat(w1.states).hasSize(3)
|
||||
val w1 = services1.vaultService.unconsumedStates<Cash.State>()
|
||||
assertThat(w1).hasSize(3)
|
||||
|
||||
val originalStorage = services1.storageService
|
||||
val originalVault = services1.vaultService
|
||||
val services2 = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
override val vaultService: VaultService get() = originalVault
|
||||
|
||||
// We need to be able to find the same transactions as before, too.
|
||||
override val storageService: TxWritableStorageService get() = originalStorage
|
||||
@ -79,8 +82,32 @@ class NodeVaultServiceTest {
|
||||
}
|
||||
}
|
||||
|
||||
val w2 = services2.vaultService.currentVault
|
||||
assertThat(w2.states).hasSize(3)
|
||||
val w2 = services2.vaultService.unconsumedStates<Cash.State>()
|
||||
assertThat(w2).hasSize(3)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `states for refs`() {
|
||||
databaseTransaction(database) {
|
||||
val services1 = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
vaultService.notify(stx.tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w1 = services1.vaultService.unconsumedStates<Cash.State>()
|
||||
assertThat(w1).hasSize(3)
|
||||
|
||||
val stateRefs = listOf(w1[1].ref, w1[2].ref)
|
||||
val states = services1.vaultService.statesForRefs(stateRefs)
|
||||
assertThat(states).hasSize(2)
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,7 +115,7 @@ class NodeVaultServiceTest {
|
||||
fun addNoteToTransaction() {
|
||||
databaseTransaction(database) {
|
||||
val services = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
|
@ -3,10 +3,14 @@ package net.corda.node.services
|
||||
import net.corda.contracts.asset.Cash
|
||||
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
|
||||
import net.corda.contracts.testing.fillWithSomeTestCash
|
||||
import net.corda.contracts.testing.fillWithSomeTestDeals
|
||||
import net.corda.contracts.testing.fillWithSomeTestLinearStates
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.composite
|
||||
import net.corda.core.node.recordTransactions
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.consumedStates
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
@ -14,7 +18,10 @@ import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.BOB_KEY
|
||||
import net.corda.testing.BOB_PUBKEY
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MEGA_CORP_KEY
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
@ -37,13 +44,14 @@ class VaultWithCashTest {
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
LogHelper.setLevel(VaultWithCashTest::class)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
databaseTransaction(database) {
|
||||
services = object : MockServices() {
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
@ -58,7 +66,7 @@ class VaultWithCashTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset(NodeVaultService::class)
|
||||
LogHelper.reset(VaultWithCashTest::class)
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
@ -68,15 +76,15 @@ class VaultWithCashTest {
|
||||
// Fix the PRNG so that we get the same splits every time.
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
val w = vault.currentVault
|
||||
assertEquals(3, w.states.toList().size)
|
||||
val w = vault.unconsumedStates<Cash.State>()
|
||||
assertEquals(3, w.toList().size)
|
||||
|
||||
val state = w.states.toList()[0].state.data as Cash.State
|
||||
val state = w.toList()[0].state.data
|
||||
assertEquals(30.45.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount)
|
||||
assertEquals(services.key.public.composite, state.owner)
|
||||
|
||||
assertEquals(34.70.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount)
|
||||
assertEquals(34.85.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount)
|
||||
assertEquals(34.70.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.toList()[2].state.data).amount)
|
||||
assertEquals(34.85.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.toList()[1].state.data).amount)
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,7 +127,6 @@ class VaultWithCashTest {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `branching LinearStates fails to verify`() {
|
||||
databaseTransaction(database) {
|
||||
@ -128,8 +135,8 @@ class VaultWithCashTest {
|
||||
|
||||
// Issue a linear state
|
||||
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
signWith(freshKey)
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
@ -149,7 +156,7 @@ class VaultWithCashTest {
|
||||
|
||||
// Issue a linear state
|
||||
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
signWith(freshKey)
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
@ -157,11 +164,11 @@ class VaultWithCashTest {
|
||||
dummyIssue.toLedgerTransaction(services).verify()
|
||||
|
||||
services.recordTransactions(dummyIssue)
|
||||
assertEquals(1, vault.currentVault.states.toList().size)
|
||||
assertEquals(1, vault.unconsumedStates<net.corda.contracts.testing.DummyLinearContract.State>().size)
|
||||
|
||||
// Move the same state
|
||||
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite)))
|
||||
addInputState(dummyIssue.tx.outRef<LinearState>(0))
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
@ -169,7 +176,74 @@ class VaultWithCashTest {
|
||||
dummyIssue.toLedgerTransaction(services).verify()
|
||||
|
||||
services.recordTransactions(dummyMove)
|
||||
assertEquals(1, vault.currentVault.states.toList().size)
|
||||
assertEquals(1, vault.unconsumedStates<net.corda.contracts.testing.DummyLinearContract.State>().size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `spending cash in vault of mixed state types works`() {
|
||||
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
databaseTransaction(database) {
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L), ownedBy = freshKey.public.composite)
|
||||
services.fillWithSomeTestCash(100.SWISS_FRANCS, DUMMY_NOTARY, 2, 2, Random(0L))
|
||||
services.fillWithSomeTestCash(100.POUNDS, DUMMY_NOTARY, 1, 1, Random(0L))
|
||||
val cash = vault.unconsumedStates<Cash.State>()
|
||||
cash.forEach { println(it.state.data.amount) }
|
||||
|
||||
services.fillWithSomeTestDeals(listOf("123","456","789"))
|
||||
val deals = vault.unconsumedStates<net.corda.contracts.testing.DummyDealContract.State>()
|
||||
deals.forEach { println(it.state.data.ref) }
|
||||
}
|
||||
|
||||
databaseTransaction(database) {
|
||||
// A tx that spends our money.
|
||||
val spendTX = TransactionType.General.Builder(DUMMY_NOTARY).apply {
|
||||
vault.generateSpend(this, 80.DOLLARS, BOB_PUBKEY)
|
||||
signWith(freshKey)
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
services.recordTransactions(spendTX)
|
||||
|
||||
val consumedStates = vault.consumedStates<ContractState>()
|
||||
assertEquals(3, consumedStates.count())
|
||||
|
||||
val unconsumedStates = vault.unconsumedStates<ContractState>()
|
||||
assertEquals(7, unconsumedStates.count())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consuming multiple contract state types in same transaction`() {
|
||||
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
databaseTransaction(database) {
|
||||
|
||||
services.fillWithSomeTestDeals(listOf("123","456","789"))
|
||||
val deals = vault.unconsumedStates<net.corda.contracts.testing.DummyDealContract.State>()
|
||||
deals.forEach { println(it.state.data.ref) }
|
||||
|
||||
services.fillWithSomeTestLinearStates(3)
|
||||
val linearStates = vault.unconsumedStates<net.corda.contracts.testing.DummyLinearContract.State>()
|
||||
linearStates.forEach { println(it.state.data.linearId) }
|
||||
|
||||
// Create a txn consuming different contract types
|
||||
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(net.corda.contracts.testing.DummyLinearContract.State(participants = listOf(freshKey.public.composite)))
|
||||
addOutputState(net.corda.contracts.testing.DummyDealContract.State(ref = "999", participants = listOf(freshKey.public.composite)))
|
||||
addInputState(linearStates[0])
|
||||
addInputState(deals[0])
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
|
||||
dummyMove.toLedgerTransaction(services).verify()
|
||||
services.recordTransactions(dummyMove)
|
||||
|
||||
val consumedStates = vault.consumedStates<ContractState>()
|
||||
assertEquals(2, consumedStates.count())
|
||||
|
||||
val unconsumedStates = vault.unconsumedStates<ContractState>()
|
||||
assertEquals(6, unconsumedStates.count())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,172 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import io.requery.Persistable
|
||||
import io.requery.kotlin.eq
|
||||
import io.requery.sql.KotlinEntityDataStore
|
||||
import net.corda.core.contracts.DummyContract
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullPublicKey
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.serialization.createKryo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.DUMMY_PUBKEY_1
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.vault.schemas.Models
|
||||
import net.corda.node.services.vault.schemas.VaultCashBalancesEntity
|
||||
import net.corda.node.services.vault.schemas.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.VaultStatesEntity
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
class RequeryConfigurationTest {
|
||||
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var transactionStorage: DBTransactionStorage
|
||||
lateinit var requerySession: KotlinEntityDataStore<Persistable>
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProperties)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
newTransactionStorage()
|
||||
newRequeryStorage(dataSourceProperties)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction inserts in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
databaseTransaction(database) {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
databaseTransaction(database) {
|
||||
Assertions.assertThat(transactionStorage.transactions).containsOnly(txn)
|
||||
requerySession.withTransaction {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(result.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction operations in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
databaseTransaction(database) {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
upsert(createCashBalance())
|
||||
select(VaultSchema.VaultCashBalances::class).get().first()
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
databaseTransaction(database) {
|
||||
Assertions.assertThat(transactionStorage.transactions).containsOnly(txn)
|
||||
requerySession.withTransaction {
|
||||
val cashQuery = select(VaultSchema.VaultCashBalances::class) where (VaultSchema.VaultCashBalances::currency eq "GBP")
|
||||
assertEquals(12345, cashQuery.get().first().amount)
|
||||
val stateQuery = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(stateQuery.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction rollback in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
databaseTransaction(database) {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
rollback()
|
||||
}
|
||||
|
||||
databaseTransaction(database) {
|
||||
Assertions.assertThat(transactionStorage.transactions).isEmpty()
|
||||
requerySession.withTransaction {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(result.get().count() == 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createVaultStateEntity(txn: SignedTransaction): VaultStatesEntity {
|
||||
val txnState = txn.tx.inputs[0]
|
||||
val state = VaultStatesEntity().apply {
|
||||
txId = txnState.txhash.toString()
|
||||
index = txnState.index
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = DummyContract.SingleOwnerState::class.java.name
|
||||
contractState = DummyContract.SingleOwnerState(owner = DUMMY_PUBKEY_1).serialize(createKryo()).bytes
|
||||
notaryName = txn.tx.notary!!.name
|
||||
notaryKey = txn.tx.notary!!.owningKey.toBase58String()
|
||||
recordedTime = Instant.now()
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
||||
private fun createCashBalance(): VaultCashBalancesEntity {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "GBP"
|
||||
cashBalanceEntity.amount = 12345
|
||||
return cashBalanceEntity
|
||||
}
|
||||
|
||||
private fun newTransactionStorage() {
|
||||
databaseTransaction(database) {
|
||||
transactionStorage = DBTransactionStorage()
|
||||
}
|
||||
}
|
||||
|
||||
private fun newRequeryStorage(dataSourceProperties: Properties) {
|
||||
databaseTransaction(database) {
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, true)
|
||||
requerySession = configuration.sessionForModel(Models.VAULT)
|
||||
}
|
||||
}
|
||||
|
||||
private fun newTransaction(): SignedTransaction {
|
||||
val wtx = WireTransaction(
|
||||
inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)),
|
||||
attachments = emptyList(),
|
||||
outputs = emptyList(),
|
||||
commands = emptyList(),
|
||||
notary = DUMMY_NOTARY,
|
||||
signers = emptyList(),
|
||||
type = TransactionType.General(),
|
||||
timestamp = null
|
||||
)
|
||||
return SignedTransaction(wtx.serialized, listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1))), wtx.id)
|
||||
}
|
||||
}
|
@ -92,6 +92,21 @@ class DBTransactionStorageTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `two transactions with rollback`() {
|
||||
val firstTransaction = newTransaction()
|
||||
val secondTransaction = newTransaction()
|
||||
databaseTransaction(database) {
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
transactionStorage.addTransaction(secondTransaction)
|
||||
rollback()
|
||||
}
|
||||
|
||||
databaseTransaction(database) {
|
||||
assertThat(transactionStorage.transactions).isEmpty()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `two transactions in same DB transaction scope`() {
|
||||
val firstTransaction = newTransaction()
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.contracts.USD
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
|
||||
@ -47,12 +48,12 @@ class DataVendingServiceTests {
|
||||
ptx.signWith(registerKey)
|
||||
val tx = ptx.toSignedTransaction()
|
||||
databaseTransaction(vaultServiceNode.database) {
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size)
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates<Cash.State>().size)
|
||||
|
||||
registerNode.sendNotifyTx(tx, vaultServiceNode)
|
||||
|
||||
// Check the transaction is in the receiving node
|
||||
val actual = vaultServiceNode.services.vaultService.currentVault.states.singleOrNull()
|
||||
val actual = vaultServiceNode.services.vaultService.unconsumedStates<Cash.State>().singleOrNull()
|
||||
val expected = tx.tx.outRef<Cash.State>(0)
|
||||
|
||||
assertEquals(expected, actual)
|
||||
@ -78,12 +79,12 @@ class DataVendingServiceTests {
|
||||
ptx.signWith(registerKey)
|
||||
val tx = ptx.toSignedTransaction(false)
|
||||
databaseTransaction(vaultServiceNode.database) {
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size)
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates<Cash.State>().size)
|
||||
|
||||
registerNode.sendNotifyTx(tx, vaultServiceNode)
|
||||
|
||||
// Check the transaction is not in the receiving node
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size)
|
||||
assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates<Cash.State>().size)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user