mirror of
https://github.com/corda/corda.git
synced 2025-01-24 21:37:05 +00:00
Requery removal (#1276)
* Removed Requery object relational mapping usage (and associated schemas including node-schemas module) * Fixed issues with NodeAttachmentService tests. Cannot use JPA custom converters with Primary Key fields. Hibernate entities require explicit call to flush() to persist to disk. * Removed redundant requery converters (equivalents not even required in Hibernate). * Removed remaining gradle requery dependency definitions. * Fixed broken tests. * Fixes for failing NodeVaultService tests: - Dynamic SQL updates (in soft locking code) - Explicit request by session to participate in transaction (causing "TransactionRequiredException" Executing an update/delete query) - Explicit flush() required to persist to disk * Updated changelog. Fixed compiler warning. * Fixed WHERE clause AND/OR condition. Enforced immediate data visibility through transaction commit. * Final fixes to address failing tests. * Deferred all hibernate session/txn management to DatabaseTransactionManager. * Fixed transaction boundaries in failing Cash tests. * Fixes to address failing tests (transaction boundaries, merge detached object, config clean-up). * Final adjustment to transaction boundaries in JUnit tests. * Refactored AttachmentSchemaV1 into NodeAttachmentService itself and referenced from NodeServicesV1. * Refactored HSQL UPDATE statements to use CriteriaUpdate API. * Updated all criteria API getters to reference attribute names by type. * Remove redundant VaultSchema entity name (required when previously using HSQL UPDATE syntax) * Fix compiler warnings. * Minor changes following rebase from master. * Fixed suppress warning type.
This commit is contained in:
parent
56a84882a7
commit
a2ede0fc73
@ -42,7 +42,6 @@ buildscript {
|
||||
ext.hibernate_version = '5.2.6.Final'
|
||||
ext.h2_version = '1.4.194'
|
||||
ext.rxjava_version = '1.2.4'
|
||||
ext.requery_version = '1.3.1'
|
||||
ext.dokka_version = '0.9.14'
|
||||
ext.eddsa_version = '0.2.0'
|
||||
|
||||
@ -249,7 +248,7 @@ bintrayConfig {
|
||||
projectUrl = 'https://github.com/corda/corda'
|
||||
gpgSign = true
|
||||
gpgPassphrase = System.getenv('CORDA_BINTRAY_GPG_PASSPHRASE')
|
||||
publications = ['corda-jfx', 'corda-mock', 'corda-rpc', 'corda-core', 'corda', 'corda-finance', 'corda-node', 'corda-node-api', 'corda-node-schemas', 'corda-test-common', 'corda-test-utils', 'corda-jackson', 'corda-verifier', 'corda-webserver-impl', 'corda-webserver']
|
||||
publications = ['corda-jfx', 'corda-mock', 'corda-rpc', 'corda-core', 'corda', 'corda-finance', 'corda-node', 'corda-node-api', 'corda-test-common', 'corda-test-utils', 'corda-jackson', 'corda-verifier', 'corda-webserver-impl', 'corda-webserver']
|
||||
license {
|
||||
name = 'Apache-2.0'
|
||||
url = 'https://www.apache.org/licenses/LICENSE-2.0'
|
||||
|
@ -63,9 +63,6 @@ dependencies {
|
||||
|
||||
// JPA 2.1 annotations.
|
||||
compile "org.hibernate.javax.persistence:hibernate-jpa-2.1-api:1.0.0.Final"
|
||||
|
||||
// Requery: SQL based query & persistence for Kotlin
|
||||
compile "io.requery:requery-kotlin:$requery_version"
|
||||
}
|
||||
|
||||
configurations {
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.core.schemas
|
||||
|
||||
import io.requery.Persistable
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -69,4 +68,4 @@ data class PersistentStateRef(
|
||||
/**
|
||||
* Marker interface to denote a persistable Corda state entity that will always have a transaction id and index
|
||||
*/
|
||||
interface StatePersistable : Persistable
|
||||
interface StatePersistable
|
@ -1,27 +0,0 @@
|
||||
package net.corda.core.schemas.requery
|
||||
|
||||
import io.requery.Key
|
||||
import io.requery.Persistable
|
||||
import io.requery.Superclass
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.schemas.StatePersistable
|
||||
|
||||
import javax.persistence.Column
|
||||
|
||||
object Requery {
|
||||
/**
|
||||
* A super class for all mapped states exported to a schema that ensures the [StateRef] appears on the database row. The
|
||||
* [StateRef] will be set to the correct value by the framework (there's no need to set during mapping generation by the state itself).
|
||||
*/
|
||||
// TODO: this interface will supercede the existing [PersistentState] interface defined in PersistentTypes.kt
|
||||
// once we cut-over all existing Hibernate ContractState persistence to Requery
|
||||
@Superclass interface PersistentState : StatePersistable {
|
||||
@get:Key
|
||||
@get:Column(name = "transaction_id", length = 64)
|
||||
var txId: String
|
||||
|
||||
@get:Key
|
||||
@get:Column(name = "output_index")
|
||||
var index: Int
|
||||
}
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import java.sql.Blob
|
||||
import javax.sql.rowset.serial.SerialBlob
|
||||
|
||||
/**
|
||||
* Converts from a [ByteArray] to a [Blob].
|
||||
*/
|
||||
class BlobConverter : Converter<ByteArray, Blob> {
|
||||
|
||||
override fun getMappedType(): Class<ByteArray> = ByteArray::class.java
|
||||
|
||||
override fun getPersistedType(): Class<Blob> = Blob::class.java
|
||||
|
||||
/**
|
||||
* creates BLOB(INT.MAX) = 2 GB
|
||||
*/
|
||||
override fun getPersistedSize(): Int? = null
|
||||
|
||||
override fun convertToPersisted(value: ByteArray?): Blob? {
|
||||
return value?.let { SerialBlob(value) }
|
||||
}
|
||||
|
||||
override fun convertToMapped(type: Class<out ByteArray>?, value: Blob?): ByteArray? {
|
||||
return value?.getBytes(1, value.length().toInt())
|
||||
}
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import java.sql.Timestamp
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Converts from a [Instant] to a [java.sql.Timestamp] for Java 8. Note that
|
||||
* when converting between the time type and the database type all times will be converted to the
|
||||
* UTC zone offset.
|
||||
*/
|
||||
class InstantConverter : Converter<Instant, Timestamp> {
|
||||
override fun getMappedType() = Instant::class.java
|
||||
|
||||
override fun getPersistedType() = Timestamp::class.java
|
||||
|
||||
override fun getPersistedSize() = null
|
||||
|
||||
override fun convertToPersisted(value: Instant?) = value?.let { Timestamp.from(it) }
|
||||
|
||||
override fun convertToMapped(type: Class<out Instant>, value: Timestamp?) = value?.toInstant()
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import net.corda.core.crypto.SecureHash
|
||||
|
||||
/**
|
||||
* Convert from a [SecureHash] to a [String]
|
||||
*/
|
||||
class SecureHashConverter : Converter<SecureHash, String> {
|
||||
|
||||
override fun getMappedType(): Class<SecureHash> = SecureHash::class.java
|
||||
|
||||
override fun getPersistedType(): Class<String> = String::class.java
|
||||
|
||||
/**
|
||||
* SecureHash consists of 32 bytes which need VARCHAR(64) in hex
|
||||
* TODO: think about other hash widths
|
||||
*/
|
||||
override fun getPersistedSize(): Int? = 64
|
||||
|
||||
override fun convertToPersisted(value: SecureHash?): String? {
|
||||
return value?.toString()
|
||||
}
|
||||
|
||||
override fun convertToMapped(type: Class<out SecureHash>, value: String?): SecureHash? {
|
||||
return value?.let { SecureHash.parse(value) }
|
||||
}
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.Converter
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
|
||||
/**
|
||||
* Converts from a [StateRef] to a Composite Key defined by a [String] txnHash and an [Int] index
|
||||
*/
|
||||
class StateRefConverter : Converter<StateRef, Pair<String, Int>> {
|
||||
override fun getMappedType() = StateRef::class.java
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun getPersistedType() = Pair::class.java as Class<Pair<String, Int>>
|
||||
|
||||
override fun getPersistedSize() = null
|
||||
|
||||
override fun convertToPersisted(value: StateRef?) = value?.let { Pair(it.txhash.toString(), it.index) }
|
||||
|
||||
override fun convertToMapped(type: Class<out StateRef>, value: Pair<String, Int>?) = value?.let { StateRef(SecureHash.parse(it.first), it.second) }
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package net.corda.core.schemas.requery.converters
|
||||
|
||||
import io.requery.converter.EnumOrdinalConverter
|
||||
import net.corda.core.node.services.Vault
|
||||
|
||||
/**
|
||||
* Converter which persists a [Vault.StateStatus] enum using its enum ordinal representation
|
||||
*/
|
||||
class VaultStateStatusConverter : EnumOrdinalConverter<Vault.StateStatus>(Vault.StateStatus::class.java)
|
@ -4,20 +4,18 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FetchAttachmentsFlow
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -32,13 +30,10 @@ import kotlin.test.assertFailsWith
|
||||
|
||||
class AttachmentTests {
|
||||
lateinit var mockNet: MockNetwork
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
mockNet = MockNetwork()
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = makeTestDatabaseProperties())
|
||||
}
|
||||
|
||||
@After
|
||||
@ -137,11 +132,9 @@ class AttachmentTests {
|
||||
val corruptBytes = "arggghhhh".toByteArray()
|
||||
System.arraycopy(corruptBytes, 0, attachment, 0, corruptBytes.size)
|
||||
|
||||
val corruptAttachment = AttachmentEntity()
|
||||
corruptAttachment.attId = id
|
||||
corruptAttachment.content = attachment
|
||||
val corruptAttachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = attachment)
|
||||
n0.database.transaction {
|
||||
n0.attachments.session.update(corruptAttachment)
|
||||
DatabaseTransactionManager.current().session.update(corruptAttachment)
|
||||
}
|
||||
|
||||
// Get n1 to fetch the attachment. Should receive corrupted bytes.
|
||||
|
@ -49,8 +49,8 @@ class ContractUpgradeFlowTest {
|
||||
notary = nodes.notaryNode.info.notaryIdentity
|
||||
|
||||
val nodeIdentity = nodes.notaryNode.info.legalIdentitiesAndCerts.single { it.party == nodes.notaryNode.info.notaryIdentity }
|
||||
a.services.identityService.registerIdentity(nodeIdentity)
|
||||
b.services.identityService.registerIdentity(nodeIdentity)
|
||||
a.services.identityService.verifyAndRegisterIdentity(nodeIdentity)
|
||||
b.services.identityService.verifyAndRegisterIdentity(nodeIdentity)
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -18,7 +18,7 @@ import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -53,13 +53,11 @@ private fun MockNetwork.MockNode.hackAttachment(attachmentId: SecureHash, conten
|
||||
* @see NodeAttachmentService.importAttachment
|
||||
*/
|
||||
private fun NodeAttachmentService.updateAttachment(attachmentId: SecureHash, data: ByteArray) {
|
||||
with(session) {
|
||||
withTransaction {
|
||||
update(AttachmentEntity().apply {
|
||||
attId = attachmentId
|
||||
content = data
|
||||
})
|
||||
}
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val attachment = session.get<NodeAttachmentService.DBAttachment>(NodeAttachmentService.DBAttachment::class.java, attachmentId.toString())
|
||||
attachment?.let {
|
||||
attachment.content = data
|
||||
session.save(attachment)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,11 @@ from the previous milestone release.
|
||||
UNRELEASED
|
||||
----------
|
||||
|
||||
* Removed usage of Requery ORM library (repalced with JPA/Hibernate)
|
||||
|
||||
* Vault Query performance improvement (replaced expensive per query SQL statement to obtain concrete state types
|
||||
with single query on start-up followed by dynamic updates using vault state observable))
|
||||
|
||||
* Vault Query fix: filter by multiple issuer names in ``FungibleAssetQueryCriteria``
|
||||
|
||||
* Following deprecated methods have been removed:
|
||||
|
@ -20,7 +20,6 @@ The Corda repository comprises the following folders:
|
||||
* **lib** contains some dependencies
|
||||
* **node** contains the core code of the Corda node (eg: node driver, node services, messaging, persistence)
|
||||
* **node-api** contains data structures shared between the node and the client module, e.g. types sent via RPC
|
||||
* **node-schemas** contains entity classes used to represent relational database tables
|
||||
* **samples** contains all our Corda demos and code samples
|
||||
* **test-utils** contains some utilities for unit testing contracts ( the contracts testing DSL) and protocols (the
|
||||
mock network) implementation
|
||||
|
@ -132,7 +132,6 @@ object TopupIssuerFlow {
|
||||
val notaryParty = serviceHub.networkMapCache.notaryNodes[0].notaryIdentity
|
||||
// invoke Cash subflow to issue Asset
|
||||
progressTracker.currentStep = ISSUING
|
||||
val issuer = serviceHub.myInfo.legalIdentity
|
||||
val issueCashFlow = CashIssueFlow(amount, issuerPartyRef, notaryParty)
|
||||
val issueTx = subFlow(issueCashFlow)
|
||||
// NOTE: issueCashFlow performs a Broadcast (which stores a local copy of the txn to the ledger)
|
||||
|
@ -14,7 +14,6 @@ dependencies {
|
||||
// Note the :finance module is a CorDapp in its own right
|
||||
// and CorDapps using :finance features should use 'cordapp' not 'compile' linkage.
|
||||
cordaCompile project(':core')
|
||||
cordaCompile project(':node-schemas')
|
||||
|
||||
testCompile project(':test-utils')
|
||||
testCompile project(path: ':core', configuration: 'testArtifacts')
|
||||
|
@ -68,7 +68,8 @@ class CashTests : TestDependencyInjectionBase() {
|
||||
ownedBy = OUR_IDENTITY_1, issuedBy = MINI_CORP.ref(1), issuerServices = miniCorpServices)
|
||||
miniCorpServices.fillWithSomeTestCash(howMuch = 80.SWISS_FRANCS, atLeastThisManyStates = 1, atMostThisManyStates = 1,
|
||||
ownedBy = OUR_IDENTITY_1, issuedBy = MINI_CORP.ref(1), issuerServices = miniCorpServices)
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
vaultStatesUnconsumed = miniCorpServices.vaultQueryService.queryBy<Cash.State>().states
|
||||
}
|
||||
resetTestSerialization()
|
||||
@ -560,9 +561,11 @@ class CashTests : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun generateSimpleDirectSpend() {
|
||||
initialiseTestSerialization()
|
||||
val wtx =
|
||||
database.transaction {
|
||||
makeSpend(100.DOLLARS, THEIR_IDENTITY_1)
|
||||
}
|
||||
database.transaction {
|
||||
val wtx = makeSpend(100.DOLLARS, THEIR_IDENTITY_1)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val vaultState = vaultStatesUnconsumed.elementAt(0)
|
||||
assertEquals(vaultState.ref, wtx.inputs[0])
|
||||
@ -586,10 +589,11 @@ class CashTests : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun generateSimpleSpendWithChange() {
|
||||
initialiseTestSerialization()
|
||||
val wtx =
|
||||
database.transaction {
|
||||
makeSpend(10.DOLLARS, THEIR_IDENTITY_1)
|
||||
}
|
||||
database.transaction {
|
||||
|
||||
val wtx = makeSpend(10.DOLLARS, THEIR_IDENTITY_1)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val vaultState = vaultStatesUnconsumed.elementAt(0)
|
||||
assertEquals(vaultState.ref, wtx.inputs[0])
|
||||
@ -602,9 +606,11 @@ class CashTests : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun generateSpendWithTwoInputs() {
|
||||
initialiseTestSerialization()
|
||||
val wtx =
|
||||
database.transaction {
|
||||
makeSpend(500.DOLLARS, THEIR_IDENTITY_1)
|
||||
}
|
||||
database.transaction {
|
||||
val wtx = makeSpend(500.DOLLARS, THEIR_IDENTITY_1)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val vaultState0 = vaultStatesUnconsumed.elementAt(0)
|
||||
val vaultState1 = vaultStatesUnconsumed.elementAt(1)
|
||||
@ -618,10 +624,13 @@ class CashTests : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun generateSpendMixedDeposits() {
|
||||
initialiseTestSerialization()
|
||||
val wtx =
|
||||
database.transaction {
|
||||
val wtx = makeSpend(580.DOLLARS, THEIR_IDENTITY_1)
|
||||
assertEquals(3, wtx.inputs.size)
|
||||
wtx
|
||||
}
|
||||
database.transaction {
|
||||
val wtx = makeSpend(580.DOLLARS, THEIR_IDENTITY_1)
|
||||
assertEquals(3, wtx.inputs.size)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val vaultState0 = vaultStatesUnconsumed.elementAt(0)
|
||||
val vaultState1 = vaultStatesUnconsumed.elementAt(1)
|
||||
|
@ -8,7 +8,6 @@ import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import io.requery.util.CloseableIterator
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
@ -103,14 +102,9 @@ open class SerializationFactoryImpl : SerializationFactory {
|
||||
|
||||
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
|
||||
override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) {
|
||||
val message = if (closeable is CloseableIterator<*>) {
|
||||
"A live Iterator pointing to the database has been detected during flow checkpointing. This may be due " +
|
||||
"to a Vault query - move it into a private method."
|
||||
} else {
|
||||
"${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
|
||||
val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
|
||||
"Restoring such resources across node restarts is not supported. Make sure code accessing it is " +
|
||||
"confined to a private method or the reference is nulled out."
|
||||
}
|
||||
throw UnsupportedOperationException(message)
|
||||
}
|
||||
|
||||
|
@ -1,37 +0,0 @@
|
||||
apply plugin: 'net.corda.plugins.publish-utils'
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'kotlin-kapt'
|
||||
apply plugin: 'idea'
|
||||
apply plugin: 'com.jfrog.artifactory'
|
||||
|
||||
description 'Corda node database schemas'
|
||||
|
||||
dependencies {
|
||||
compile project(':core')
|
||||
|
||||
// Requery: SQL based query & persistence for Kotlin
|
||||
kapt "io.requery:requery-processor:$requery_version"
|
||||
|
||||
testCompile project(':test-utils')
|
||||
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
|
||||
testCompile "junit:junit:$junit_version"
|
||||
|
||||
// For H2 database support in persistence
|
||||
testCompile "com.h2database:h2:$h2_version"
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main {
|
||||
kotlin {
|
||||
srcDir "$buildDir/generated/source/kapt/main"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
jar {
|
||||
baseName 'corda-node-schemas'
|
||||
}
|
||||
|
||||
publish {
|
||||
name jar.baseName
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
package net.corda.node.services.persistence.schemas.requery
|
||||
|
||||
import io.requery.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.schemas.requery.converters.BlobConverter
|
||||
|
||||
@Table(name = "attachments")
|
||||
@Entity(model = "persistence")
|
||||
interface Attachment : Persistable {
|
||||
|
||||
@get:Key
|
||||
@get:Column(name = "att_id", index = true)
|
||||
var attId: SecureHash
|
||||
|
||||
@get:Column(name = "content")
|
||||
@get:Convert(BlobConverter::class)
|
||||
var content: ByteArray
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
package net.corda.node.services.vault.schemas.requery
|
||||
|
||||
import io.requery.*
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.schemas.requery.Requery
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
object VaultSchema {
|
||||
|
||||
@Table(name = "vault_transaction_notes")
|
||||
@Entity(model = "vault")
|
||||
interface VaultTxnNote : Persistable {
|
||||
@get:Key
|
||||
@get:Generated
|
||||
@get:Column(name = "seq_no", index = true)
|
||||
var seqNo: Int
|
||||
|
||||
@get:Column(name = "transaction_id", length = 64, index = true)
|
||||
var txId: String
|
||||
|
||||
@get:Column(name = "note")
|
||||
var note: String
|
||||
}
|
||||
|
||||
@Table(name = "vault_cash_balances")
|
||||
@Entity(model = "vault")
|
||||
interface VaultCashBalances : Persistable {
|
||||
@get:Key
|
||||
@get:Column(name = "currency_code", length = 3)
|
||||
var currency: String
|
||||
|
||||
@get:Column(name = "amount", value = "0")
|
||||
var amount: Long
|
||||
}
|
||||
|
||||
@Table(name = "vault_states")
|
||||
@Entity(model = "vault")
|
||||
interface VaultStates : Requery.PersistentState {
|
||||
/** refers to the notary a state is attached to */
|
||||
@get:Column(name = "notary_name")
|
||||
var notaryName: String
|
||||
|
||||
/** references a concrete ContractState that is [QueryableState] and has a [MappedSchema] */
|
||||
@get:Column(name = "contract_state_class_name")
|
||||
var contractStateClassName: String
|
||||
|
||||
/** refers to serialized transaction Contract State */
|
||||
// TODO: define contract state size maximum size and adjust length accordingly
|
||||
@get:Column(name = "contract_state", length = 100000)
|
||||
var contractState: ByteArray
|
||||
|
||||
/** state lifecycle: unconsumed, consumed */
|
||||
@get:Column(name = "state_status")
|
||||
var stateStatus: Vault.StateStatus
|
||||
|
||||
/** refers to timestamp recorded upon entering UNCONSUMED state */
|
||||
@get:Column(name = "recorded_timestamp")
|
||||
var recordedTime: Instant
|
||||
|
||||
/** refers to timestamp recorded upon entering CONSUMED state */
|
||||
@get:Column(name = "consumed_timestamp", nullable = true)
|
||||
var consumedTime: Instant?
|
||||
|
||||
/** used to denote a state has been soft locked (to prevent double spend)
|
||||
* will contain a temporary unique [UUID] obtained from a flow session */
|
||||
@get:Column(name = "lock_id", nullable = true)
|
||||
var lockId: String?
|
||||
|
||||
/** refers to the last time a lock was taken (reserved) or updated (released, re-reserved) */
|
||||
@get:Column(name = "lock_timestamp", nullable = true)
|
||||
var lockUpdateTime: Instant?
|
||||
}
|
||||
}
|
@ -1,666 +0,0 @@
|
||||
package net.corda.node.services.vault.schemas
|
||||
|
||||
import io.requery.Persistable
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.kotlin.*
|
||||
import io.requery.query.RowExpression
|
||||
import io.requery.rx.KotlinRxEntityStore
|
||||
import io.requery.sql.*
|
||||
import io.requery.sql.platform.Generic
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.composite.CompositeKey
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.schemas.requery.converters.InstantConverter
|
||||
import net.corda.core.schemas.requery.converters.VaultStateStatusConverter
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.node.services.vault.schemas.requery.*
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.TestDependencyInjectionBase
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import org.h2.jdbcx.JdbcDataSource
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertNull
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class VaultSchemaTest : TestDependencyInjectionBase() {
|
||||
|
||||
var instance: KotlinEntityDataStore<Persistable>? = null
|
||||
val data: KotlinEntityDataStore<Persistable> get() = instance!!
|
||||
|
||||
var oinstance: KotlinRxEntityStore<Persistable>? = null
|
||||
val odata: KotlinRxEntityStore<Persistable> get() = oinstance!!
|
||||
|
||||
var transaction: LedgerTransaction? = null
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
val dataSource = JdbcDataSource()
|
||||
dataSource.setURL("jdbc:h2:mem:vault_persistence;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1")
|
||||
val configuration = KotlinConfiguration(dataSource = dataSource, model = Models.VAULT, mapping = setupCustomMapping(), useDefaultLogging = true)
|
||||
instance = KotlinEntityDataStore<Persistable>(configuration)
|
||||
oinstance = KotlinRxEntityStore(KotlinEntityDataStore<Persistable>(configuration))
|
||||
val tables = SchemaModifier(configuration)
|
||||
val mode = TableCreationMode.DROP_CREATE
|
||||
tables.createTables(mode)
|
||||
|
||||
// create dummy test data
|
||||
setupDummyData()
|
||||
}
|
||||
|
||||
private fun setupCustomMapping(): Mapping? {
|
||||
val mapping = GenericMapping(Generic())
|
||||
val instantConverter = InstantConverter()
|
||||
mapping.addConverter(instantConverter, instantConverter.mappedType)
|
||||
val vaultStateStatusConverter = VaultStateStatusConverter()
|
||||
mapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType)
|
||||
return mapping
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
data.close()
|
||||
}
|
||||
|
||||
private class VaultNoopContract : Contract {
|
||||
data class VaultNoopState(override val owner: AbstractParty) : OwnableState {
|
||||
override val contract = VaultNoopContract()
|
||||
override val participants: List<AbstractParty>
|
||||
get() = listOf(owner)
|
||||
|
||||
override fun withNewOwner(newOwner: AbstractParty) = CommandAndState(Commands.Create(), copy(owner = newOwner))
|
||||
}
|
||||
|
||||
interface Commands : CommandData {
|
||||
class Create : TypeOnlyCommandData(), Commands
|
||||
}
|
||||
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
// Always accepts.
|
||||
}
|
||||
}
|
||||
|
||||
private fun setupDummyData() {
|
||||
// dummy Transaction comprised of 3 different Contract State types
|
||||
// 1. SingleOwnerState
|
||||
// 2. MultiOwnerState
|
||||
// 3. VaultNoopState
|
||||
val notary: Party = DUMMY_NOTARY
|
||||
val inState1 = TransactionState(DummyContract.SingleOwnerState(0, ALICE), notary)
|
||||
val inState2 = TransactionState(DummyContract.MultiOwnerState(0,
|
||||
listOf(ALICE, BOB)), notary)
|
||||
val inState3 = TransactionState(VaultNoopContract.VaultNoopState(ALICE), notary)
|
||||
val outState1 = inState1.copy()
|
||||
val outState2 = inState2.copy()
|
||||
val outState3 = inState3.copy()
|
||||
val inputs = listOf(StateAndRef(inState1, StateRef(SecureHash.randomSHA256(), 0)),
|
||||
StateAndRef(inState2, StateRef(SecureHash.randomSHA256(), 0)),
|
||||
StateAndRef(inState3, StateRef(SecureHash.randomSHA256(), 0)))
|
||||
val outputs = listOf(outState1, outState2, outState3)
|
||||
val commands = emptyList<AuthenticatedObject<CommandData>>()
|
||||
val attachments = emptyList<Attachment>()
|
||||
val id = SecureHash.randomSHA256()
|
||||
val timeWindow: TimeWindow? = null
|
||||
val privacySalt: PrivacySalt = PrivacySalt()
|
||||
transaction = LedgerTransaction(
|
||||
inputs,
|
||||
outputs,
|
||||
commands,
|
||||
attachments,
|
||||
id,
|
||||
notary,
|
||||
timeWindow,
|
||||
privacySalt
|
||||
)
|
||||
}
|
||||
|
||||
private fun createTxnWithTwoStateTypes(): LedgerTransaction {
|
||||
val notary: Party = DUMMY_NOTARY
|
||||
val inState1 = TransactionState(DummyContract.SingleOwnerState(0, ALICE), notary)
|
||||
val inState2 = TransactionState(DummyContract.MultiOwnerState(0,
|
||||
listOf(ALICE, BOB)), notary)
|
||||
val outState1 = inState1.copy()
|
||||
val outState2 = inState2.copy()
|
||||
val state1TxHash = SecureHash.randomSHA256()
|
||||
val state2TxHash = SecureHash.randomSHA256()
|
||||
val inputs = listOf(StateAndRef(inState1, StateRef(state1TxHash, 0)),
|
||||
StateAndRef(inState1, StateRef(state1TxHash, 1)),
|
||||
StateAndRef(inState2, StateRef(state2TxHash, 0)),
|
||||
StateAndRef(inState1, StateRef(state1TxHash, 2))) // bogus state not in db
|
||||
val outputs = listOf(outState1, outState2)
|
||||
val commands = emptyList<AuthenticatedObject<CommandData>>()
|
||||
val attachments = emptyList<Attachment>()
|
||||
val id = SecureHash.randomSHA256()
|
||||
val timeWindow: TimeWindow? = null
|
||||
val privacySalt: PrivacySalt = PrivacySalt()
|
||||
return LedgerTransaction(
|
||||
inputs,
|
||||
outputs,
|
||||
commands,
|
||||
attachments,
|
||||
id,
|
||||
notary,
|
||||
timeWindow,
|
||||
privacySalt
|
||||
)
|
||||
}
|
||||
|
||||
private fun dummyStatesInsert(txn: LedgerTransaction) {
|
||||
data.invoke {
|
||||
// skip inserting the last txn state (to mimic spend attempt of non existent unconsumed state)
|
||||
txn.inputs.subList(0, txn.inputs.lastIndex).forEach {
|
||||
insert(createStateEntity(it))
|
||||
// create additional state entities with idx >0
|
||||
for (i in 3..4) {
|
||||
try {
|
||||
createStateEntity(it, idx = i).apply {
|
||||
insert(this)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
}
|
||||
}
|
||||
// create additional state entities with different txn id
|
||||
for (i in 1..3) {
|
||||
createStateEntity(it, txHash = SecureHash.randomSHA256().toString()).apply {
|
||||
insert(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
// insert an additional MultiOwnerState with idx 1
|
||||
insert(createStateEntity(txn.inputs[2], idx = 1))
|
||||
|
||||
// insert entities with other state type
|
||||
for (i in 1..5) {
|
||||
VaultStatesEntity().apply {
|
||||
txId = SecureHash.randomSHA256().toString()
|
||||
index = 0
|
||||
contractStateClassName = VaultNoopContract.VaultNoopState::class.java.name
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
insert(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check total numner of inserted states
|
||||
assertEquals(3 + 4 + 9 + 1 + 5, data.select(VaultSchema.VaultStates::class).get().count())
|
||||
}
|
||||
|
||||
/**
|
||||
* Vault Schema: VaultStates
|
||||
*/
|
||||
@Test
|
||||
fun testInsertState() {
|
||||
val state = VaultStatesEntity()
|
||||
state.txId = "12345"
|
||||
state.index = 0
|
||||
data.invoke {
|
||||
insert(state)
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq state.txId)
|
||||
Assert.assertSame(state, result().first())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpsertUnconsumedState() {
|
||||
val stateEntity = createStateEntity(transaction!!.inputs[0])
|
||||
data.invoke {
|
||||
upsert(stateEntity)
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId)
|
||||
Assert.assertSame(stateEntity, result().first())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpsertConsumedState() {
|
||||
val stateEntity = createStateEntity(transaction!!.inputs[0])
|
||||
data.invoke {
|
||||
upsert(stateEntity)
|
||||
}
|
||||
val keys = mapOf(VaultStatesEntity.TX_ID to stateEntity.txId,
|
||||
VaultStatesEntity.INDEX to stateEntity.index)
|
||||
val key = io.requery.proxy.CompositeKey(keys)
|
||||
data.invoke {
|
||||
val state = findByKey(VaultStatesEntity::class, key)
|
||||
state?.run {
|
||||
stateStatus = Vault.StateStatus.CONSUMED
|
||||
consumedTime = Instant.now()
|
||||
update(state)
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq state.txId)
|
||||
assertEquals(Vault.StateStatus.CONSUMED, result().first().stateStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testCashBalanceUpdate() {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "USD"
|
||||
cashBalanceEntity.amount = 100
|
||||
data.invoke {
|
||||
val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency)
|
||||
assertNull(state)
|
||||
upsert(cashBalanceEntity)
|
||||
}
|
||||
data.invoke {
|
||||
val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency)
|
||||
state?.let {
|
||||
state.amount -= 80
|
||||
upsert(state)
|
||||
}
|
||||
assertEquals(20, state!!.amount)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testTransactionalUpsertState() {
|
||||
data.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
transaction!!.inputs.forEach {
|
||||
val stateEntity = createStateEntity(it)
|
||||
insert(stateEntity)
|
||||
}
|
||||
val result = select(VaultSchema.VaultStates::class)
|
||||
Assert.assertSame(3, result().toList().size)
|
||||
}
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class)
|
||||
Assert.assertSame(3, result().toList().size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testDistinctContractStateTypes() {
|
||||
val txn = createTxnWithTwoStateTypes()
|
||||
dummyStatesInsert(txn)
|
||||
|
||||
data.invoke {
|
||||
transaction!!.inputs.forEach {
|
||||
val stateEntity = createStateEntity(it)
|
||||
insert(stateEntity)
|
||||
}
|
||||
|
||||
val query = select(VaultSchema.VaultStates::contractStateClassName).distinct()
|
||||
val results = query.get()
|
||||
|
||||
Assert.assertSame(3, results.count())
|
||||
}
|
||||
}
|
||||
|
||||
private fun createStateEntity(stateAndRef: StateAndRef<*>, idx: Int? = null, txHash: String? = null): VaultStatesEntity {
|
||||
val stateRef = stateAndRef.ref
|
||||
val state = stateAndRef.state
|
||||
return VaultStatesEntity().apply {
|
||||
txId = txHash ?: stateRef.txhash.toString()
|
||||
index = idx ?: stateRef.index
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = state.data.javaClass.name
|
||||
contractState = state.serialize().bytes
|
||||
notaryName = state.notary.name.toString()
|
||||
recordedTime = Instant.now()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Vault Schema: Transaction Notes
|
||||
*/
|
||||
@Test
|
||||
fun testInsertTxnNote() {
|
||||
val txnNoteEntity = VaultTxnNoteEntity()
|
||||
txnNoteEntity.txId = "12345"
|
||||
txnNoteEntity.note = "Sample transaction note"
|
||||
data.invoke {
|
||||
insert(txnNoteEntity)
|
||||
val result = select(VaultSchema.VaultTxnNote::class)
|
||||
Assert.assertSame(txnNoteEntity, result().first())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testFindTxnNote() {
|
||||
val txnNoteEntity = VaultTxnNoteEntity()
|
||||
txnNoteEntity.txId = "12345"
|
||||
txnNoteEntity.note = "Sample transaction note #1"
|
||||
val txnNoteEntity2 = VaultTxnNoteEntity()
|
||||
txnNoteEntity2.txId = "23456"
|
||||
txnNoteEntity2.note = "Sample transaction note #2"
|
||||
data.invoke {
|
||||
insert(txnNoteEntity)
|
||||
insert(txnNoteEntity2)
|
||||
}
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultTxnNote::class) where (VaultSchema.VaultTxnNote::txId eq txnNoteEntity2.txId)
|
||||
assertEquals(result().count(), 1)
|
||||
Assert.assertSame(txnNoteEntity2, result().first())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Vault Schema: Cash Balances
|
||||
*/
|
||||
@Test
|
||||
fun testInsertCashBalance() {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "GPB"
|
||||
cashBalanceEntity.amount = 12345
|
||||
data.invoke {
|
||||
insert(cashBalanceEntity)
|
||||
val result = select(VaultSchema.VaultCashBalances::class)
|
||||
Assert.assertSame(cashBalanceEntity, result().first())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateCashBalance() {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "GPB"
|
||||
cashBalanceEntity.amount = 12345
|
||||
data.invoke {
|
||||
insert(cashBalanceEntity)
|
||||
}
|
||||
data.invoke {
|
||||
val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency)
|
||||
assertNotNull(state)
|
||||
state?.let {
|
||||
state.amount += 10000
|
||||
update(state)
|
||||
val result = select(VaultCashBalancesEntity::class)
|
||||
assertEquals(22345, result().first().amount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpsertCashBalance() {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "GPB"
|
||||
cashBalanceEntity.amount = 12345
|
||||
data.invoke {
|
||||
val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency)
|
||||
state?.let {
|
||||
state.amount += 10000
|
||||
}
|
||||
val result = upsert(state ?: cashBalanceEntity)
|
||||
assertEquals(12345, result.amount)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testAllUnconsumedStates() {
|
||||
data.invoke {
|
||||
transaction!!.inputs.forEach {
|
||||
insert(createStateEntity(it))
|
||||
}
|
||||
}
|
||||
val stateAndRefs = unconsumedStates<ContractState>()
|
||||
assertNotNull(stateAndRefs)
|
||||
assertTrue { stateAndRefs.size == 3 }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun tesUnconsumedDummyStates() {
|
||||
data.invoke {
|
||||
transaction!!.inputs.forEach {
|
||||
insert(createStateEntity(it))
|
||||
}
|
||||
}
|
||||
val stateAndRefs = unconsumedStates<DummyContract.State>()
|
||||
assertNotNull(stateAndRefs)
|
||||
assertTrue { stateAndRefs.size == 2 }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun tesUnconsumedDummySingleOwnerStates() {
|
||||
data.invoke {
|
||||
transaction!!.inputs.forEach {
|
||||
insert(createStateEntity(it))
|
||||
}
|
||||
}
|
||||
val stateAndRefs = unconsumedStates<DummyContract.SingleOwnerState>()
|
||||
assertNotNull(stateAndRefs)
|
||||
assertTrue { stateAndRefs.size == 1 }
|
||||
}
|
||||
|
||||
inline fun <reified T : ContractState> unconsumedStates(): List<StateAndRef<T>> {
|
||||
val stateAndRefs =
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class)
|
||||
.where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
|
||||
result.get()
|
||||
.map { it ->
|
||||
val stateRef = StateRef(SecureHash.parse(it.txId), it.index)
|
||||
val state = it.contractState.deserialize<TransactionState<T>>()
|
||||
StateAndRef(state, stateRef)
|
||||
}.filter {
|
||||
T::class.java.isAssignableFrom(it.state.data.javaClass)
|
||||
}.toList()
|
||||
}
|
||||
return stateAndRefs
|
||||
}
|
||||
|
||||
/**
|
||||
* Observables testing
|
||||
*/
|
||||
@Test
|
||||
@Throws(Exception::class)
|
||||
fun testInsert() {
|
||||
val stateEntity = createStateEntity(transaction!!.inputs[0])
|
||||
val latch = CountDownLatch(1)
|
||||
odata.insert(stateEntity).subscribe {
|
||||
Assert.assertNotNull(it.txId)
|
||||
Assert.assertTrue(it.txId.isNotEmpty())
|
||||
val cached = data.select(VaultSchema.VaultStates::class)
|
||||
.where(VaultSchema.VaultStates::txId.eq(it.txId)).get().first()
|
||||
Assert.assertSame(cached, it)
|
||||
latch.countDown()
|
||||
}
|
||||
latch.await()
|
||||
}
|
||||
|
||||
@Test
|
||||
@Throws(Exception::class)
|
||||
fun testInsertCount() {
|
||||
val stateEntity = createStateEntity(transaction!!.inputs[0])
|
||||
Observable.just(stateEntity)
|
||||
.concatMap { person -> odata.insert(person).toObservable() }
|
||||
odata.insert(stateEntity).toBlocking().value()
|
||||
Assert.assertNotNull(stateEntity.txId)
|
||||
Assert.assertTrue(stateEntity.txId.isNotEmpty())
|
||||
val count = data.count(VaultSchema.VaultStates::class).get().value()
|
||||
Assert.assertEquals(1, count.toLong())
|
||||
}
|
||||
|
||||
@Test
|
||||
@Throws(Exception::class)
|
||||
fun testQueryEmpty() {
|
||||
val latch = CountDownLatch(1)
|
||||
odata.select(VaultSchema.VaultStates::class).get().toObservable()
|
||||
.subscribe({ Assert.fail() }, { Assert.fail() }) { latch.countDown() }
|
||||
if (!latch.await(1, TimeUnit.SECONDS)) {
|
||||
Assert.fail()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Throws(Exception::class)
|
||||
fun testQueryObservable() {
|
||||
transaction!!.inputs.forEach {
|
||||
val stateEntity = createStateEntity(it)
|
||||
odata.insert(stateEntity).toBlocking().value()
|
||||
}
|
||||
val states = ArrayList<VaultStatesEntity>()
|
||||
odata.select(VaultSchema.VaultStates::class).get()
|
||||
.toObservable()
|
||||
.subscribe { it -> states.add(it as VaultStatesEntity) }
|
||||
Assert.assertEquals(3, states.size)
|
||||
}
|
||||
|
||||
/**
|
||||
* Requery composite key tests (using RowExpression introduced in 1.2.1)
|
||||
*/
|
||||
@Test
|
||||
fun testQueryWithCompositeKey() {
|
||||
// txn entity with 4 input states (SingleOwnerState x 3, MultiOwnerState x 1)
|
||||
val txn = createTxnWithTwoStateTypes()
|
||||
dummyStatesInsert(txn)
|
||||
|
||||
data.invoke {
|
||||
val primaryCompositeKey = listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX)
|
||||
val expression = RowExpression.of(primaryCompositeKey)
|
||||
val stateRefs = txn.inputs.map { listOf("'${it.ref.txhash}'", it.ref.index) }
|
||||
|
||||
val result = select(VaultStatesEntity::class) where (expression.`in`(stateRefs))
|
||||
assertEquals(3, result.get().count())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateWithCompositeKey() {
|
||||
// txn entity with 4 input states (SingleOwnerState x 3, MultiOwnerState x 1)
|
||||
val txn = createTxnWithTwoStateTypes()
|
||||
dummyStatesInsert(txn)
|
||||
|
||||
data.invoke {
|
||||
val primaryCompositeKey = listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX)
|
||||
val expression = RowExpression.of(primaryCompositeKey)
|
||||
val stateRefs = txn.inputs.map { listOf("'${it.ref.txhash}'", it.ref.index) }
|
||||
|
||||
val update = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, "")
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, Instant.now())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(expression.`in`(stateRefs)).get()
|
||||
assertEquals(3, update.value())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Soft locking tests
|
||||
*/
|
||||
@Test
|
||||
fun testSingleSoftLockUpdate() {
|
||||
|
||||
// insert unconsumed state
|
||||
val stateEntity = createStateEntity(transaction!!.inputs[0])
|
||||
data.invoke {
|
||||
upsert(stateEntity)
|
||||
}
|
||||
|
||||
// reserve soft lock on state
|
||||
stateEntity.apply {
|
||||
this.lockId = "LOCK#1"
|
||||
this.lockUpdateTime = Instant.now()
|
||||
data.invoke {
|
||||
upsert(stateEntity)
|
||||
}
|
||||
}
|
||||
|
||||
// select unlocked states
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId)
|
||||
.and(VaultSchema.VaultStates::lockId.isNull())
|
||||
assertEquals(0, result.get().count())
|
||||
}
|
||||
|
||||
// release soft lock on state
|
||||
data.invoke {
|
||||
val update = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, Instant.now())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultStatesEntity.LOCK_ID eq "LOCK#1").get()
|
||||
assertEquals(1, update.value())
|
||||
}
|
||||
|
||||
// select unlocked states
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId)
|
||||
.and(VaultSchema.VaultStates::lockId.isNull())
|
||||
assertEquals(1, result.get().count())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMultipleSoftLocksUpdate() {
|
||||
|
||||
// insert unconsumed state
|
||||
data.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
transaction!!.inputs.forEach {
|
||||
val stateEntity = createStateEntity(it)
|
||||
insert(stateEntity)
|
||||
}
|
||||
val result = select(VaultSchema.VaultStates::class)
|
||||
Assert.assertSame(3, result().toList().size)
|
||||
}
|
||||
|
||||
// reserve soft locks on states
|
||||
transaction!!.inputs.forEach {
|
||||
val stateEntity = createStateEntity(it)
|
||||
stateEntity.apply {
|
||||
this.lockId = "LOCK#1"
|
||||
this.lockUpdateTime = Instant.now()
|
||||
data.invoke {
|
||||
upsert(stateEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// select unlocked states
|
||||
val txnIds = transaction!!.inputs.map { it.ref.txhash.toString() }.toSet()
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId `in` txnIds)
|
||||
.and(VaultSchema.VaultStates::lockId eq "")
|
||||
assertEquals(0, result.get().count())
|
||||
}
|
||||
|
||||
// release soft lock on states
|
||||
data.invoke {
|
||||
val primaryCompositeKey = listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX)
|
||||
val expression = RowExpression.of(primaryCompositeKey)
|
||||
val stateRefs = transaction!!.inputs.map { listOf("'${it.ref.txhash}'", it.ref.index) }
|
||||
|
||||
val update = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, "")
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, Instant.now())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(expression.`in`(stateRefs)).get()
|
||||
assertEquals(3, update.value())
|
||||
}
|
||||
|
||||
// select unlocked states
|
||||
data.invoke {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId `in` txnIds)
|
||||
.and(VaultSchema.VaultStates::lockId eq "")
|
||||
assertEquals(3, result.get().count())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun insertWithBigCompositeKey() {
|
||||
val vaultStEntity = VaultStatesEntity().apply {
|
||||
txId = SecureHash.randomSHA256().toString()
|
||||
index = 314
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = VaultNoopContract.VaultNoopState::class.java.name
|
||||
notaryName = "Huge distributed notary"
|
||||
recordedTime = Instant.now()
|
||||
}
|
||||
data.insert(vaultStEntity)
|
||||
assertEquals(1, data.select(VaultSchema.VaultStates::class).get().count())
|
||||
}
|
||||
}
|
@ -76,7 +76,6 @@ processSmokeTestResources {
|
||||
// build/reports/project/dependencies/index.html for green highlighted parts of the tree.
|
||||
|
||||
dependencies {
|
||||
compile project(':node-schemas')
|
||||
compile project(':node-api')
|
||||
compile project(':client:rpc')
|
||||
compile project(':cordform-common')
|
||||
@ -180,9 +179,6 @@ dependencies {
|
||||
compile 'commons-codec:commons-codec:1.10'
|
||||
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
|
||||
|
||||
// Requery: object mapper for Kotlin
|
||||
compile "io.requery:requery-kotlin:$requery_version"
|
||||
|
||||
// FastClasspathScanner: classpath scanning
|
||||
compile 'io.github.lukehutch:fast-classpath-scanner:2.0.21'
|
||||
|
||||
|
@ -412,7 +412,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private fun makeServices(): MutableList<Any> {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
_services = ServiceHubInternalImpl()
|
||||
attachments = NodeAttachmentService(configuration.dataSourceProperties, services.monitoringService.metrics, configuration.database)
|
||||
attachments = NodeAttachmentService(services.monitoringService.metrics)
|
||||
val legalIdentity = obtainIdentity("identity", configuration.myLegalName)
|
||||
network = makeMessagingService(legalIdentity)
|
||||
info = makeInfo(legalIdentity)
|
||||
@ -749,6 +749,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
|
||||
|
||||
private inner class ServiceHubInternalImpl : ServiceHubInternal, SingletonSerializeAsToken() {
|
||||
|
||||
private val hibernateConfig by lazy { HibernateConfiguration(schemaService, configuration.database ?: Properties(), { identityService }) }
|
||||
|
||||
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
|
||||
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
|
||||
override val auditService = DummyAuditService()
|
||||
@ -756,9 +759,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
override val validatedTransactions = makeTransactionStorage()
|
||||
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
|
||||
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) }
|
||||
override val vaultService by lazy { NodeVaultService(this, configuration.dataSourceProperties, configuration.database) }
|
||||
override val vaultService by lazy { NodeVaultService(this) }
|
||||
override val vaultQueryService by lazy {
|
||||
HibernateVaultQueryImpl(HibernateConfiguration(schemaService, configuration.database ?: Properties(), { identityService }), vaultService)
|
||||
HibernateVaultQueryImpl(hibernateConfig, vaultService)
|
||||
}
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.schemas.converters.AbstractPartyToX500NameAsStringConverte
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.node.utilities.parserTransactionIsolationLevel
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.MetadataSources
|
||||
import org.hibernate.boot.model.naming.Identifier
|
||||
@ -28,6 +29,8 @@ class HibernateConfiguration(val schemaService: SchemaService, val databasePrope
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
|
||||
|
||||
private val transactionIsolationLevel = parserTransactionIsolationLevel(databaseProperties.getProperty("transactionIsolationLevel") ?:"")
|
||||
|
||||
init {
|
||||
schemaService.schemaOptions.map { it.key }.forEach { mappedSchema ->
|
||||
sessionFactories.computeIfAbsent(mappedSchema, { makeSessionFactoryForSchema(mappedSchema) })
|
||||
@ -60,6 +63,7 @@ class HibernateConfiguration(val schemaService: SchemaService, val databasePrope
|
||||
val config = Configuration(metadataSources).setProperty("hibernate.connection.provider_class", HibernateConfiguration.NodeDatabaseConnectionProvider::class.java.name)
|
||||
.setProperty("hibernate.hbm2ddl.auto", if (databaseProperties.getProperty("initDatabase","true") == "true") "update" else "validate")
|
||||
.setProperty("hibernate.format_sql", "true")
|
||||
.setProperty("hibernate.connection.isolation", transactionIsolationLevel.toString())
|
||||
|
||||
schemas.forEach { schema ->
|
||||
// TODO: require mechanism to set schemaOptions (databaseSchema, tablePrefix) which are not global to session
|
||||
|
@ -1,146 +0,0 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import io.requery.EntityCache
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.TransactionListener
|
||||
import io.requery.cache.WeakEntityCache
|
||||
import io.requery.meta.EntityModel
|
||||
import io.requery.sql.*
|
||||
import io.requery.sql.platform.H2
|
||||
import io.requery.util.function.Function
|
||||
import io.requery.util.function.Supplier
|
||||
import net.corda.core.schemas.requery.converters.InstantConverter
|
||||
import net.corda.core.schemas.requery.converters.SecureHashConverter
|
||||
import net.corda.core.schemas.requery.converters.StateRefConverter
|
||||
import net.corda.core.schemas.requery.converters.VaultStateStatusConverter
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import javax.sql.DataSource
|
||||
|
||||
/**
|
||||
* Requery KotlinConfiguration wrapper class to enable us to pass in an existing database connection and
|
||||
* associated transaction context.
|
||||
*/
|
||||
class KotlinConfigurationTransactionWrapper(private val model: EntityModel,
|
||||
dataSource: DataSource,
|
||||
private val mapping: Mapping? = null,
|
||||
private val platform: Platform? = null,
|
||||
private val cache: EntityCache = WeakEntityCache(),
|
||||
private val useDefaultLogging: Boolean = false,
|
||||
private val statementCacheSize: Int = 0,
|
||||
private val batchUpdateSize: Int = 64,
|
||||
private val quoteTableNames: Boolean = false,
|
||||
private val quoteColumnNames: Boolean = false,
|
||||
private val tableTransformer: Function<String, String>? = null,
|
||||
private val columnTransformer: Function<String, String>? = null,
|
||||
private val transactionMode: TransactionMode = TransactionMode.NONE,
|
||||
private val transactionIsolation: TransactionIsolation? = null,
|
||||
private val statementListeners: Set<StatementListener> = LinkedHashSet(),
|
||||
private val entityStateListeners: Set<EntityStateListener<Any>> = LinkedHashSet(),
|
||||
private val transactionListeners: Set<Supplier<TransactionListener>> = LinkedHashSet(),
|
||||
private val writeExecutor: Executor? = null) : Configuration {
|
||||
|
||||
private val connectionProvider = CordaDataSourceConnectionProvider(dataSource)
|
||||
|
||||
override fun getBatchUpdateSize(): Int {
|
||||
return batchUpdateSize
|
||||
}
|
||||
|
||||
override fun getConnectionProvider(): ConnectionProvider? {
|
||||
return connectionProvider
|
||||
}
|
||||
|
||||
override fun getCache(): EntityCache? {
|
||||
return cache
|
||||
}
|
||||
|
||||
override fun getEntityStateListeners(): Set<EntityStateListener<Any>> {
|
||||
return entityStateListeners
|
||||
}
|
||||
|
||||
override fun getMapping(): Mapping? {
|
||||
// TODO: database platform provider to become configurable and parameterised into this configuration
|
||||
val customMapping = GenericMapping(H2())
|
||||
|
||||
// register our custom converters
|
||||
val instantConverter = InstantConverter()
|
||||
customMapping.addConverter(instantConverter, instantConverter.mappedType)
|
||||
val vaultStateStatusConverter = VaultStateStatusConverter()
|
||||
customMapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType)
|
||||
customMapping.addConverter(StateRefConverter(), StateRefConverter::getMappedType.javaClass)
|
||||
customMapping.addConverter(SecureHashConverter(), SecureHashConverter::getMappedType.javaClass)
|
||||
|
||||
return customMapping
|
||||
}
|
||||
|
||||
override fun getModel(): EntityModel {
|
||||
return model
|
||||
}
|
||||
|
||||
override fun getPlatform(): Platform? {
|
||||
return platform
|
||||
}
|
||||
|
||||
override fun getQuoteTableNames(): Boolean {
|
||||
return quoteTableNames
|
||||
}
|
||||
|
||||
override fun getQuoteColumnNames(): Boolean {
|
||||
return quoteColumnNames
|
||||
}
|
||||
|
||||
override fun getTableTransformer(): Function<String, String>? {
|
||||
return tableTransformer
|
||||
}
|
||||
|
||||
override fun getColumnTransformer(): Function<String, String>? {
|
||||
return columnTransformer
|
||||
}
|
||||
|
||||
override fun getStatementCacheSize(): Int {
|
||||
return statementCacheSize
|
||||
}
|
||||
|
||||
override fun getStatementListeners(): Set<StatementListener>? {
|
||||
return statementListeners
|
||||
}
|
||||
|
||||
override fun getTransactionMode(): TransactionMode? {
|
||||
return transactionMode
|
||||
}
|
||||
|
||||
override fun getTransactionIsolation(): TransactionIsolation? {
|
||||
return transactionIsolation
|
||||
}
|
||||
|
||||
override fun getTransactionListenerFactories(): Set<Supplier<TransactionListener>>? {
|
||||
return transactionListeners
|
||||
}
|
||||
|
||||
override fun getUseDefaultLogging(): Boolean {
|
||||
return useDefaultLogging
|
||||
}
|
||||
|
||||
override fun getWriteExecutor(): Executor? {
|
||||
return writeExecutor
|
||||
}
|
||||
|
||||
class CordaDataSourceConnectionProvider(val dataSource: DataSource) : ConnectionProvider {
|
||||
override fun getConnection(): Connection = CordaConnection(DatabaseTransactionManager.current().connection)
|
||||
}
|
||||
|
||||
class CordaConnection(val connection: Connection) : Connection by connection {
|
||||
override fun close() {
|
||||
// TODO: address requery auto-closing the connection in SchemaModifier upon table creation
|
||||
// https://github.com/requery/requery/issues/424
|
||||
}
|
||||
|
||||
override fun setAutoCommit(autoCommit: Boolean) {
|
||||
// TODO: address requery bug in ConnectionTransaction commit()
|
||||
// https://github.com/requery/requery/issues/423
|
||||
connection.autoCommit = false
|
||||
}
|
||||
}
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import io.requery.Persistable
|
||||
import io.requery.TransactionIsolation
|
||||
import io.requery.meta.EntityModel
|
||||
import io.requery.sql.KotlinEntityDataStore
|
||||
import io.requery.sql.SchemaModifier
|
||||
import io.requery.sql.TableCreationMode
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class RequeryConfiguration(val properties: Properties, val useDefaultLogging: Boolean = false, val databaseProperties: Properties) {
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<RequeryConfiguration>()
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// 1. schemaService schemaOptions needs to be applied: eg. default schema, table prefix
|
||||
// 2. set other generic database configuration options: show_sql, format_sql
|
||||
// 3. Configure Requery Database platform specific features (see http://requery.github.io/javadoc/io/requery/sql/Platform.html)
|
||||
// 4. Configure Cache Manager and Cache Provider and set in Requery Configuration (see http://requery.github.io/javadoc/io/requery/EntityCache.html)
|
||||
// 5. Consider database schema deployment/upgrade strategies to replace dynamic table creation.
|
||||
|
||||
// Note: Annotations are pre-processed using (kapt) so no need to register dynamically
|
||||
val config = HikariConfig(properties)
|
||||
val dataSource = HikariDataSource(config)
|
||||
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
private val sessionFactories = ConcurrentHashMap<EntityModel, KotlinEntityDataStore<Persistable>>()
|
||||
|
||||
fun sessionForModel(model: EntityModel): KotlinEntityDataStore<Persistable> {
|
||||
return sessionFactories.computeIfAbsent(model, { makeSessionFactoryForModel(it) })
|
||||
}
|
||||
|
||||
fun makeSessionFactoryForModel(model: EntityModel): KotlinEntityDataStore<Persistable> {
|
||||
val configuration = KotlinConfigurationTransactionWrapper(model, dataSource, useDefaultLogging = this.useDefaultLogging)
|
||||
val tables = SchemaModifier(configuration)
|
||||
if (databaseProperties.getProperty("initDatabase","true") == "true" ) {
|
||||
val mode = TableCreationMode.CREATE_NOT_EXISTS
|
||||
tables.createTables(mode)
|
||||
}
|
||||
return KotlinEntityDataStore(configuration)
|
||||
}
|
||||
|
||||
// TODO: remove once Requery supports QUERY WITH COMPOSITE_KEY IN
|
||||
fun jdbcSession(): Connection = DatabaseTransactionManager.current().connection
|
||||
}
|
||||
|
||||
fun parserTransactionIsolationLevel(property: String?) : TransactionIsolation =
|
||||
when (property) {
|
||||
"none" -> TransactionIsolation.NONE
|
||||
"readUncommitted" -> TransactionIsolation.READ_UNCOMMITTED
|
||||
"readCommitted" -> TransactionIsolation.READ_COMMITTED
|
||||
"repeatableRead" -> TransactionIsolation.REPEATABLE_READ
|
||||
"serializable" -> TransactionIsolation.SERIALIZABLE
|
||||
else -> {
|
||||
TransactionIsolation.REPEATABLE_READ
|
||||
}
|
||||
}
|
@ -12,41 +12,50 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.requery.Models
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.FilterInputStream
|
||||
import java.io.IOException
|
||||
import java.io.InputStream
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.node.utilities.NODE_DATABASE_PREFIX
|
||||
import java.io.*
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
import java.util.jar.JarInputStream
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
|
||||
/**
|
||||
* Stores attachments in H2 database.
|
||||
* Stores attachments using Hibernate to database.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeAttachmentService(dataSourceProperties: Properties, metrics: MetricRegistry, databaseProperties: Properties?)
|
||||
: AttachmentStorage, SingletonSerializeAsToken() {
|
||||
class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, SingletonSerializeAsToken() {
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}attachments",
|
||||
indexes = arrayOf(Index(name = "att_id_idx", columnList = "att_id")))
|
||||
class DBAttachment(
|
||||
@Id
|
||||
@Column(name = "att_id", length = 65535)
|
||||
var attId: String,
|
||||
|
||||
@Column(name = "content")
|
||||
@Lob
|
||||
var content: ByteArray
|
||||
) : Serializable
|
||||
|
||||
companion object {
|
||||
private val log = loggerFor<NodeAttachmentService>()
|
||||
}
|
||||
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = databaseProperties ?: Properties())
|
||||
val session = configuration.sessionForModel(Models.PERSISTENCE)
|
||||
|
||||
@VisibleForTesting
|
||||
var checkAttachmentsOnLoad = true
|
||||
|
||||
private val attachmentCount = metrics.counter("Attachments")
|
||||
|
||||
init {
|
||||
session.withTransaction {
|
||||
attachmentCount.inc(session.count(AttachmentEntity::class).get().value().toLong())
|
||||
}
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
criteriaQuery.select(criteriaBuilder.count(criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)))
|
||||
val count = session.createQuery(criteriaQuery).singleResult
|
||||
attachmentCount.inc(count)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
@ -128,16 +137,13 @@ class NodeAttachmentService(dataSourceProperties: Properties, metrics: MetricReg
|
||||
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? = session.withTransaction {
|
||||
try {
|
||||
session.select(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get()
|
||||
.single()
|
||||
} catch (e: NoSuchElementException) {
|
||||
null
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val attachment = DatabaseTransactionManager.current().session.get(NodeAttachmentService.DBAttachment::class.java, id.toString())
|
||||
attachment?.let {
|
||||
return AttachmentImpl(id, { attachment.content }, checkAttachmentsOnLoad)
|
||||
}
|
||||
}?.run { AttachmentImpl(id, { content }, checkAttachmentsOnLoad) }
|
||||
return null
|
||||
}
|
||||
|
||||
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
|
||||
override fun importAttachment(jar: InputStream): SecureHash {
|
||||
@ -153,25 +159,21 @@ class NodeAttachmentService(dataSourceProperties: Properties, metrics: MetricReg
|
||||
checkIsAValidJAR(ByteArrayInputStream(bytes))
|
||||
val id = SecureHash.SHA256(hs.hash().asBytes())
|
||||
|
||||
val count = session.withTransaction {
|
||||
session.count(AttachmentEntity::class)
|
||||
.where(AttachmentEntity.ATT_ID.eq(id))
|
||||
.get().value()
|
||||
}
|
||||
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
val attachments = criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)
|
||||
criteriaQuery.select(criteriaBuilder.count(criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)))
|
||||
criteriaQuery.where(criteriaBuilder.equal(attachments.get<String>(DBAttachment::attId.name), id.toString()))
|
||||
val count = session.createQuery(criteriaQuery).singleResult
|
||||
if (count > 0) {
|
||||
throw FileAlreadyExistsException(id.toString())
|
||||
}
|
||||
|
||||
session.withTransaction {
|
||||
val attachment = AttachmentEntity()
|
||||
attachment.attId = id
|
||||
attachment.content = bytes
|
||||
session.insert(attachment)
|
||||
}
|
||||
val attachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = bytes)
|
||||
session.save(attachment)
|
||||
|
||||
attachmentCount.inc()
|
||||
|
||||
log.info("Stored new attachment $id")
|
||||
|
||||
return id
|
||||
|
@ -14,6 +14,7 @@ import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
|
||||
@ -36,7 +37,9 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
|
||||
DBTransactionMappingStorage.DBTransactionMapping::class.java,
|
||||
PersistentKeyManagementService.PersistentKey::class.java,
|
||||
PersistentUniquenessProvider.PersistentUniqueness::class.java,
|
||||
NodeSchedulerService.PersistentScheduledState::class.java
|
||||
PersistentUniquenessProvider.PersistentUniqueness::class.java,
|
||||
NodeSchedulerService.PersistentScheduledState::class.java,
|
||||
NodeAttachmentService.DBAttachment::class.java
|
||||
))
|
||||
|
||||
// Required schemas are those used by internal Corda services
|
||||
@ -46,7 +49,6 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
|
||||
Pair(VaultSchemaV1, SchemaService.SchemaOptions()),
|
||||
Pair(NodeServicesV1, SchemaService.SchemaOptions()))
|
||||
|
||||
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas.plus(customSchemas.map {
|
||||
mappedSchema -> Pair(mappedSchema, SchemaService.SchemaOptions())
|
||||
})
|
||||
|
@ -191,7 +191,7 @@ object BFTSMaRt {
|
||||
}
|
||||
|
||||
override fun getStateManager() = stateManagerOverride
|
||||
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
|
||||
// TODO: Use proper DB schema instead of JDBCHashMap.
|
||||
// Must be initialised before ServiceReplica is started
|
||||
private val commitLog = services.database.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }
|
||||
private val replica = run {
|
||||
|
@ -2,15 +2,11 @@ package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import io.requery.PersistenceException
|
||||
import io.requery.kotlin.eq
|
||||
import io.requery.query.RowExpression
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.containsAny
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.tee
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.StatesNotAvailableException
|
||||
@ -21,6 +17,7 @@ import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.node.services.vault.SortAttribute
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
@ -28,22 +25,15 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.database.parserTransactionIsolationLevel
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.vault.schemas.requery.Models
|
||||
import net.corda.node.services.vault.schemas.requery.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
|
||||
import net.corda.node.services.vault.schemas.requery.VaultTxnNoteEntity
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.node.utilities.bufferUntilDatabaseCommit
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.criteria.Predicate
|
||||
|
||||
@ -54,23 +44,15 @@ import javax.persistence.criteria.Predicate
|
||||
* This class needs database transactions to be in-flight during method calls and init, and will throw exceptions if
|
||||
* this is not the case.
|
||||
*
|
||||
* TODO: move query / filter criteria into the database query.
|
||||
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeVaultService(private val services: ServiceHub, dataSourceProperties: Properties, databaseProperties: Properties?) : SingletonSerializeAsToken(), VaultService {
|
||||
class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsToken(), VaultService {
|
||||
|
||||
private companion object {
|
||||
val log = loggerFor<NodeVaultService>()
|
||||
|
||||
// Define composite primary key used in Requery Expression
|
||||
val stateRefCompositeColumn: RowExpression = RowExpression.of(listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX))
|
||||
}
|
||||
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = databaseProperties ?: Properties())
|
||||
val session = configuration.sessionForModel(Models.VAULT)
|
||||
private val transactionIsolationLevel = parserTransactionIsolationLevel(databaseProperties?.getProperty("transactionIsolationLevel") ?:"")
|
||||
|
||||
private class InnerState {
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()!!
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()!!
|
||||
@ -89,35 +71,29 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
val consumedStateRefs = update.consumed.map { it.ref }
|
||||
log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." }
|
||||
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
producedStateRefsMap.forEach { it ->
|
||||
val state = VaultStatesEntity().apply {
|
||||
txId = it.key.txhash.toString()
|
||||
index = it.key.index
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = it.value.state.data.javaClass.name
|
||||
contractState = it.value.state.serialize(context = STORAGE_CONTEXT).bytes
|
||||
notaryName = it.value.state.notary.name.toString()
|
||||
recordedTime = services.clock.instant()
|
||||
}
|
||||
insert(state)
|
||||
}
|
||||
// TODO: awaiting support of UPDATE WHERE <Composite key> IN in Requery DSL
|
||||
consumedStateRefs.forEach { stateRef ->
|
||||
val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(),
|
||||
VaultStatesEntity.INDEX to stateRef.index))
|
||||
val state = findByKey(VaultStatesEntity::class, queryKey)
|
||||
state?.run {
|
||||
stateStatus = Vault.StateStatus.CONSUMED
|
||||
consumedTime = services.clock.instant()
|
||||
// remove lock (if held)
|
||||
if (lockId != null) {
|
||||
lockId = null
|
||||
lockUpdateTime = services.clock.instant()
|
||||
log.trace("Releasing soft lock on consumed state: $stateRef")
|
||||
}
|
||||
update(state)
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
producedStateRefsMap.forEach { stateAndRef ->
|
||||
val state = VaultSchemaV1.VaultStates(
|
||||
notary = stateAndRef.value.state.notary,
|
||||
contractStateClassName = stateAndRef.value.state.data.javaClass.name,
|
||||
contractState = stateAndRef.value.state.serialize(context = STORAGE_CONTEXT).bytes,
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED,
|
||||
recordedTime = services.clock.instant())
|
||||
state.stateRef = PersistentStateRef(stateAndRef.key)
|
||||
session.save(state)
|
||||
}
|
||||
consumedStateRefs.forEach { stateRef ->
|
||||
val state = session.get<VaultSchemaV1.VaultStates>(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef))
|
||||
state?.run {
|
||||
stateStatus = Vault.StateStatus.CONSUMED
|
||||
consumedTime = services.clock.instant()
|
||||
// remove lock (if held)
|
||||
if (lockId != null) {
|
||||
lockId = null
|
||||
lockUpdateTime = services.clock.instant()
|
||||
log.trace("Releasing soft lock on consumed state: $stateRef")
|
||||
}
|
||||
session.save(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -220,19 +196,25 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
processAndNotify(netDelta)
|
||||
}
|
||||
|
||||
// TODO: replace this method in favour of a VaultQuery query
|
||||
private fun loadStates(refs: Collection<StateRef>): HashSet<StateAndRef<ContractState>> {
|
||||
val states = HashSet<StateAndRef<ContractState>>()
|
||||
if (refs.isNotEmpty()) {
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
val result = select(VaultStatesEntity::class).
|
||||
where(stateRefCompositeColumn.`in`(stateRefArgs(refs))).
|
||||
and(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
|
||||
result.get().forEach {
|
||||
val txHash = SecureHash.parse(it.txId)
|
||||
val index = it.index
|
||||
val state = it.contractState.deserialize<TransactionState<ContractState>>(context = STORAGE_CONTEXT)
|
||||
states.add(StateAndRef(state, StateRef(txHash, index)))
|
||||
}
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val statusPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
|
||||
val persistentStateRefs = refs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
|
||||
val compositeKey = vaultStates.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
|
||||
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
|
||||
criteriaQuery.where(statusPredicate, stateRefsPredicate)
|
||||
val results = session.createQuery(criteriaQuery).resultList
|
||||
results.asSequence().forEach {
|
||||
val txHash = SecureHash.parse(it.stateRef?.txId!!)
|
||||
val index = it.stateRef?.index!!
|
||||
val state = it.contractState.deserialize<TransactionState<ContractState>>(context = STORAGE_CONTEXT)
|
||||
states.add(StateAndRef(state, StateRef(txHash, index)))
|
||||
}
|
||||
}
|
||||
return states
|
||||
@ -251,85 +233,105 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
}
|
||||
|
||||
override fun addNoteToTransaction(txnId: SecureHash, noteText: String) {
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
val txnNoteEntity = VaultTxnNoteEntity()
|
||||
txnNoteEntity.txId = txnId.toString()
|
||||
txnNoteEntity.note = noteText
|
||||
insert(txnNoteEntity)
|
||||
}
|
||||
val txnNoteEntity = VaultSchemaV1.VaultTxnNote(txnId.toString(), noteText)
|
||||
DatabaseTransactionManager.current().session.save(txnNoteEntity)
|
||||
}
|
||||
|
||||
override fun getTransactionNotes(txnId: SecureHash): Iterable<String> {
|
||||
return session.withTransaction(transactionIsolationLevel) {
|
||||
(select(VaultSchema.VaultTxnNote::class) where (VaultSchema.VaultTxnNote::txId eq txnId.toString())).get().asIterable().map { it.note }
|
||||
}
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultTxnNote::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultTxnNote::class.java)
|
||||
val txIdPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultTxnNote::txId.name), txnId.toString())
|
||||
criteriaQuery.where(txIdPredicate)
|
||||
val results = session.createQuery(criteriaQuery).resultList
|
||||
return results.asIterable().map { it.note }
|
||||
}
|
||||
|
||||
@Throws(StatesNotAvailableException::class)
|
||||
override fun softLockReserve(lockId: UUID, stateRefs: NonEmptySet<StateRef>) {
|
||||
val softLockTimestamp = services.clock.instant()
|
||||
val stateRefArgs = stateRefArgs(stateRefs)
|
||||
try {
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, lockId.toString())
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, softLockTimestamp)
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and((VaultStatesEntity.LOCK_ID eq lockId.toString()) or (VaultStatesEntity.LOCK_ID.isNull()))
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (updatedRows > 0 && updatedRows == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $lockId: $stateRefs")
|
||||
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true
|
||||
} else {
|
||||
// revert partial soft locks
|
||||
val revertUpdatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.where(VaultStatesEntity.LOCK_UPDATE_TIME eq softLockTimestamp)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (revertUpdatedRows > 0) {
|
||||
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaUpdate.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val stateStatusPredication = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
|
||||
val lockIdPredicate = criteriaBuilder.or(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name).isNull,
|
||||
criteriaBuilder.equal(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString()))
|
||||
val persistentStateRefs = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
|
||||
val compositeKey = vaultStates.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
|
||||
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
|
||||
criteriaUpdate.set(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString())
|
||||
criteriaUpdate.set(vaultStates.get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp)
|
||||
criteriaUpdate.where(stateStatusPredication, lockIdPredicate, stateRefsPredicate)
|
||||
val updatedRows = session.createQuery(criteriaUpdate).executeUpdate()
|
||||
if (updatedRows > 0 && updatedRows == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $lockId: $stateRefs")
|
||||
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true
|
||||
} else {
|
||||
// revert partial soft locks
|
||||
val criteriaRevertUpdate = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStatesRevert = criteriaRevertUpdate.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val lockIdPredicateRevert = criteriaBuilder.equal(vaultStatesRevert.get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString())
|
||||
val lockUpdateTime = criteriaBuilder.equal(vaultStatesRevert.get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp)
|
||||
val persistentStateRefsRevert = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
|
||||
val compositeKeyRevert = vaultStatesRevert.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
|
||||
val stateRefsPredicateRevert = criteriaBuilder.and(compositeKeyRevert.`in`(persistentStateRefsRevert))
|
||||
criteriaRevertUpdate.set(vaultStatesRevert.get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java))
|
||||
criteriaRevertUpdate.where(lockUpdateTime, lockIdPredicateRevert, stateRefsPredicateRevert)
|
||||
val revertUpdatedRows = session.createQuery(criteriaRevertUpdate).executeUpdate()
|
||||
if (revertUpdatedRows > 0) {
|
||||
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
} catch (e: Exception) {
|
||||
log.error("""soft lock update error attempting to reserve states for $lockId and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
if (e.cause is StatesNotAvailableException) throw (e.cause as StatesNotAvailableException)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
|
||||
val softLockTimestamp = services.clock.instant()
|
||||
val session = DatabaseTransactionManager.current().session
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
if (stateRefs == null) {
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
val update = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString()).get()
|
||||
if (update.value() > 0) {
|
||||
log.trace("Releasing ${update.value()} soft locked states for $lockId")
|
||||
}
|
||||
val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaUpdate.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val stateStatusPredication = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
|
||||
val lockIdPredicate = criteriaBuilder.equal(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString())
|
||||
criteriaUpdate.set<String>(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java))
|
||||
criteriaUpdate.set(vaultStates.get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp)
|
||||
criteriaUpdate.where(stateStatusPredication, lockIdPredicate)
|
||||
val update = session.createQuery(criteriaUpdate).executeUpdate()
|
||||
if (update > 0) {
|
||||
log.trace("Releasing $update soft locked states for $lockId")
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
session.withTransaction(transactionIsolationLevel) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant())
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs(stateRefs))).get().value()
|
||||
if (updatedRows > 0) {
|
||||
log.trace("Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs")
|
||||
}
|
||||
val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaUpdate.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val stateStatusPredication = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.UNCONSUMED)
|
||||
val lockIdPredicate = criteriaBuilder.equal(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), lockId.toString())
|
||||
val persistentStateRefs = stateRefs.map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
|
||||
val compositeKey = vaultStates.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name)
|
||||
val stateRefsPredicate = criteriaBuilder.and(compositeKey.`in`(persistentStateRefs))
|
||||
criteriaUpdate.set<String>(vaultStates.get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java))
|
||||
criteriaUpdate.set(vaultStates.get<Instant>(VaultSchemaV1.VaultStates::lockUpdateTime.name), softLockTimestamp)
|
||||
criteriaUpdate.where(stateStatusPredication, lockIdPredicate, stateRefsPredicate)
|
||||
val updatedRows = session.createQuery(criteriaUpdate).executeUpdate()
|
||||
if (updatedRows > 0) {
|
||||
log.trace("Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs")
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
} catch (e: Exception) {
|
||||
log.error("""soft lock update error attempting to release states for $lockId and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -462,11 +464,4 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
is LinearState -> state.isRelevant(ourKeys)
|
||||
else -> ourKeys.intersect(state.participants.map { it.owningKey }).isNotEmpty()
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to generate a string formatted list of Composite Keys for Requery Expression clause
|
||||
*/
|
||||
private fun stateRefArgs(stateRefs: Iterable<StateRef>): List<List<Any>> {
|
||||
return stateRefs.map { listOf("'${it.txhash}'", it.index) }
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,9 @@ import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import org.hibernate.annotations.Generated
|
||||
import org.hibernate.annotations.GenerationTime
|
||||
import java.io.Serializable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
@ -22,7 +25,7 @@ object VaultSchema
|
||||
*/
|
||||
@CordaSerializable
|
||||
object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, version = 1,
|
||||
mappedTypes = listOf(VaultStates::class.java, VaultLinearStates::class.java, VaultFungibleStates::class.java)) {
|
||||
mappedTypes = listOf(VaultStates::class.java, VaultLinearStates::class.java, VaultFungibleStates::class.java, VaultTxnNote::class.java)) {
|
||||
@Entity
|
||||
@Table(name = "vault_states",
|
||||
indexes = arrayOf(Index(name = "state_status_idx", columnList = "state_status")))
|
||||
@ -50,16 +53,16 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
|
||||
/** refers to timestamp recorded upon entering CONSUMED state */
|
||||
@Column(name = "consumed_timestamp", nullable = true)
|
||||
var consumedTime: Instant?,
|
||||
var consumedTime: Instant? = null,
|
||||
|
||||
/** used to denote a state has been soft locked (to prevent double spend)
|
||||
* will contain a temporary unique [UUID] obtained from a flow session */
|
||||
@Column(name = "lock_id", nullable = true)
|
||||
var lockId: String,
|
||||
var lockId: String? = null,
|
||||
|
||||
/** refers to the last time a lock was taken (reserved) or updated (released, re-reserved) */
|
||||
@Column(name = "lock_timestamp", nullable = true)
|
||||
var lockUpdateTime: Instant?
|
||||
var lockUpdateTime: Instant? = null
|
||||
) : PersistentState()
|
||||
|
||||
@Entity
|
||||
@ -133,4 +136,23 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
|
||||
issuerRef = _issuerRef.bytes,
|
||||
participants = _participants.toMutableSet())
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "vault_transaction_notes",
|
||||
indexes = arrayOf(Index(name = "seq_no_index", columnList = "seq_no"),
|
||||
Index(name = "transaction_id_index", columnList = "transaction_id")))
|
||||
class VaultTxnNote(
|
||||
@Id
|
||||
@GeneratedValue
|
||||
@Column(name = "seq_no")
|
||||
var seqNo: Int,
|
||||
|
||||
@Column(name = "transaction_id", length = 64)
|
||||
var txId: String,
|
||||
|
||||
@Column(name = "note")
|
||||
var note: String
|
||||
) : Serializable {
|
||||
constructor(txId: String, note: String) : this(0, txId, note)
|
||||
}
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.services.vault;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import kotlin.Pair;
|
||||
import kotlin.*;
|
||||
import net.corda.contracts.DealState;
|
||||
import net.corda.contracts.asset.Cash;
|
||||
import net.corda.contracts.asset.CashUtilities;
|
||||
@ -19,7 +19,7 @@ import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria;
|
||||
import net.corda.core.schemas.MappedSchema;
|
||||
import net.corda.core.utilities.OpaqueBytes;
|
||||
import net.corda.node.utilities.CordaPersistence;
|
||||
import net.corda.node.utilities.*;
|
||||
import net.corda.schemas.CashSchemaV1;
|
||||
import net.corda.testing.TestConstants;
|
||||
import net.corda.testing.TestDependencyInjectionBase;
|
||||
@ -66,6 +66,7 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
Set<MappedSchema> requiredSchemas = new HashSet<>();
|
||||
requiredSchemas.add(CashSchemaV1.INSTANCE);
|
||||
IdentityService identitySvc = makeTestIdentityService();
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<CordaPersistence, MockServices> databaseAndServices = makeTestDatabaseAndMockServices(requiredSchemas, keys, () -> identitySvc);
|
||||
issuerServices = new MockServices(getDUMMY_CASH_ISSUER_KEY(), getBOC_KEY());
|
||||
database = databaseAndServices.getFirst();
|
||||
@ -89,9 +90,10 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
@Test
|
||||
public void unconsumedLinearStates() throws VaultQueryException {
|
||||
database.transaction(tx -> {
|
||||
|
||||
VaultFiller.fillWithSomeTestLinearStates(services, 3);
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
// DOCSTART VaultJavaQueryExample0
|
||||
Vault.Page<LinearState> results = vaultQuerySvc.queryBy(LinearState.class);
|
||||
// DOCEND VaultJavaQueryExample0
|
||||
@ -104,11 +106,12 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
|
||||
@Test
|
||||
public void unconsumedStatesForStateRefsSortedByTxnId() {
|
||||
Vault<LinearState> issuedStates =
|
||||
database.transaction(tx -> {
|
||||
VaultFiller.fillWithSomeTestLinearStates(services, 8);
|
||||
return VaultFiller.fillWithSomeTestLinearStates(services, 2);
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
|
||||
VaultFiller.fillWithSomeTestLinearStates(services, 8);
|
||||
Vault<LinearState> issuedStates = VaultFiller.fillWithSomeTestLinearStates(services, 2);
|
||||
|
||||
Stream<StateRef> stateRefsStream = StreamSupport.stream(issuedStates.getStates().spliterator(), false).map(StateAndRef::getRef);
|
||||
List<StateRef> stateRefs = stateRefsStream.collect(Collectors.toList());
|
||||
|
||||
@ -129,13 +132,11 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
|
||||
@Test
|
||||
public void consumedCashStates() {
|
||||
Amount<Currency> amount = new Amount<>(100, Currency.getInstance("USD"));
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> amount = new Amount<>(100, Currency.getInstance("USD"));
|
||||
|
||||
VaultFiller.fillWithSomeTestCash(services,
|
||||
new Amount<Currency>(100, Currency.getInstance("USD")),
|
||||
issuerServices,
|
||||
issuerServices,
|
||||
TestConstants.getDUMMY_NOTARY(),
|
||||
3,
|
||||
3,
|
||||
@ -143,9 +144,13 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
new OpaqueBytes("1".getBytes()),
|
||||
null,
|
||||
CashUtilities.getDUMMY_CASH_ISSUER());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
VaultFiller.consumeCash(services, amount, getDUMMY_NOTARY());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
// DOCSTART VaultJavaQueryExample1
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.CONSUMED);
|
||||
Vault.Page<Cash.State> results = vaultQuerySvc.queryBy(Cash.State.class, criteria);
|
||||
@ -159,19 +164,24 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
|
||||
@Test
|
||||
public void consumedDealStatesPagedSorted() throws VaultQueryException {
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
@SuppressWarnings("unchecked")
|
||||
Triple<StateAndRef<LinearState>, UniqueIdentifier, Vault<DealState>> ids =
|
||||
database.transaction((DatabaseTransaction tx) -> {
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
StateAndRef<LinearState> linearState = states.getStates().iterator().next();
|
||||
UniqueIdentifier uid = linearState.component1().getData().getLinearId();
|
||||
|
||||
Vault<DealState> dealStates = VaultFiller.fillWithSomeTestDeals(services, dealIds);
|
||||
return new Triple(linearState,uid,dealStates);
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
StateAndRef<LinearState> linearState = states.getStates().iterator().next();
|
||||
UniqueIdentifier uid = linearState.component1().getData().getLinearId();
|
||||
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
Vault<DealState> dealStates = VaultFiller.fillWithSomeTestDeals(services, dealIds);
|
||||
|
||||
// consume states
|
||||
VaultFiller.consumeDeals(services, (List<? extends StateAndRef<? extends DealState>>) dealStates.getStates(), getDUMMY_NOTARY());
|
||||
VaultFiller.consumeLinearStates(services, Collections.singletonList(linearState), getDUMMY_NOTARY());
|
||||
|
||||
VaultFiller.consumeDeals(services, (List<? extends StateAndRef<? extends DealState>>) ids.getThird().getStates(), getDUMMY_NOTARY());
|
||||
VaultFiller.consumeLinearStates(services, Collections.singletonList(ids.getFirst()), getDUMMY_NOTARY());
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
// DOCSTART VaultJavaQueryExample2
|
||||
Vault.StateStatus status = Vault.StateStatus.CONSUMED;
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -179,7 +189,7 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
|
||||
QueryCriteria vaultCriteria = new VaultQueryCriteria(status, contractStateTypes);
|
||||
|
||||
List<UUID> linearIds = Collections.singletonList(uid.getId());
|
||||
List<UUID> linearIds = Collections.singletonList(ids.getSecond().getId());
|
||||
QueryCriteria linearCriteriaAll = new LinearStateQueryCriteria(null, linearIds);
|
||||
QueryCriteria dealCriteriaAll = new LinearStateQueryCriteria(null, null, dealIds);
|
||||
|
||||
@ -212,7 +222,9 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
VaultFiller.fillWithSomeTestCash(services, dollars100, issuerServices, TestConstants.getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
VaultFiller.fillWithSomeTestCash(services, dollars10, issuerServices, TestConstants.getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
VaultFiller.fillWithSomeTestCash(services, dollars1, issuerServices, TestConstants.getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
try {
|
||||
// DOCSTART VaultJavaQueryExample3
|
||||
QueryCriteria generalCriteria = new VaultQueryCriteria(Vault.StateStatus.ALL);
|
||||
@ -256,7 +268,9 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
new OpaqueBytes("1".getBytes()),
|
||||
null,
|
||||
getDUMMY_CASH_ISSUER());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
// DOCSTART VaultJavaQueryExample4
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
@ -276,14 +290,16 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
|
||||
@Test
|
||||
public void trackDealStatesPagedSorted() {
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
UniqueIdentifier uid =
|
||||
database.transaction(tx -> {
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
UniqueIdentifier _uid = states.getStates().iterator().next().component1().getData().getLinearId();
|
||||
|
||||
VaultFiller.fillWithSomeTestDeals(services, dealIds);
|
||||
return _uid;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
UniqueIdentifier uid = states.getStates().iterator().next().component1().getData().getLinearId();
|
||||
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
VaultFiller.fillWithSomeTestDeals(services, dealIds);
|
||||
|
||||
// DOCSTART VaultJavaQueryExample5
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Arrays.asList(DealState.class, LinearState.class));
|
||||
@ -331,6 +347,9 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
VaultFiller.fillWithSomeTestCash(services, pounds, issuerServices, TestConstants.getDUMMY_NOTARY(), 4, 4, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
VaultFiller.fillWithSomeTestCash(services, swissfrancs, issuerServices, TestConstants.getDUMMY_NOTARY(), 5, 5, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
try {
|
||||
// DOCSTART VaultJavaQueryExample21
|
||||
Field pennies = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");
|
||||
@ -376,6 +395,9 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
VaultFiller.fillWithSomeTestCash(services, pounds, issuerServices, TestConstants.getDUMMY_NOTARY(), 4, 4, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
VaultFiller.fillWithSomeTestCash(services, swissfrancs, issuerServices, TestConstants.getDUMMY_NOTARY(), 5, 5, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
try {
|
||||
// DOCSTART VaultJavaQueryExample22
|
||||
Field pennies = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");
|
||||
@ -434,7 +456,6 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void aggregateFunctionsSumByIssuerAndCurrencyAndSortByAggregateSum() {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars200 = new Amount<>(200, Currency.getInstance("USD"));
|
||||
Amount<Currency> pounds300 = new Amount<>(300, Currency.getInstance("GBP"));
|
||||
@ -445,6 +466,9 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
VaultFiller.fillWithSomeTestCash(services, pounds300, issuerServices, TestConstants.getDUMMY_NOTARY(), 3, 3, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER());
|
||||
VaultFiller.fillWithSomeTestCash(services, pounds400, issuerServices, TestConstants.getDUMMY_NOTARY(), 4, 4, new Random(0L), new OpaqueBytes("1".getBytes()), null, getBOC().ref(new OpaqueBytes("1".getBytes())));
|
||||
|
||||
return tx;
|
||||
});
|
||||
database.transaction(tx -> {
|
||||
try {
|
||||
// DOCSTART VaultJavaQueryExample23
|
||||
Field pennies = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");
|
||||
|
@ -74,7 +74,7 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
|
||||
database.transaction {
|
||||
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), makeTestDatabaseProperties(), ::makeTestIdentityService)
|
||||
services = object : MockServices(BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
|
||||
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
|
||||
override val vaultService: VaultService = makeVaultService(hibernateConfig)
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
|
@ -1,213 +0,0 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import io.requery.Persistable
|
||||
import io.requery.kotlin.eq
|
||||
import io.requery.sql.KotlinEntityDataStore
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.vault.schemas.requery.Models
|
||||
import net.corda.node.services.vault.schemas.requery.VaultCashBalancesEntity
|
||||
import net.corda.node.services.vault.schemas.requery.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.dummyCommand
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
class RequeryConfigurationTest : TestDependencyInjectionBase() {
|
||||
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var transactionStorage: DBTransactionStorage
|
||||
lateinit var requerySession: KotlinEntityDataStore<Persistable>
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
|
||||
newTransactionStorage()
|
||||
newRequeryStorage(dataSourceProperties)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction inserts in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
Assertions.assertThat(transactionStorage.transactions).containsOnly(txn)
|
||||
requerySession.withTransaction {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(result.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction operations in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
upsert(createCashBalance())
|
||||
select(VaultSchema.VaultCashBalances::class).get().first()
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
Assertions.assertThat(transactionStorage.transactions).containsOnly(txn)
|
||||
requerySession.withTransaction {
|
||||
val cashQuery = select(VaultSchema.VaultCashBalances::class) where (VaultSchema.VaultCashBalances::currency eq "GBP")
|
||||
assertEquals(12345, cashQuery.get().first().amount)
|
||||
val stateQuery = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(stateQuery.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transaction rollback in same DB transaction scope across two persistence engines`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
rollback()
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
Assertions.assertThat(transactionStorage.transactions).isEmpty()
|
||||
requerySession.withTransaction {
|
||||
val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString())
|
||||
Assertions.assertThat(result.get().count() == 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `bounded iteration`() {
|
||||
// insert 100 entities
|
||||
database.transaction {
|
||||
requerySession.withTransaction {
|
||||
(1..100)
|
||||
.map { newTransaction(it) }
|
||||
.forEach { insert(createVaultStateEntity(it)) }
|
||||
}
|
||||
}
|
||||
|
||||
// query entities 41..45
|
||||
database.transaction {
|
||||
requerySession.withTransaction {
|
||||
// Note: cannot specify a limit explicitly when using iterator skip & take
|
||||
val query = select(VaultSchema.VaultStates::class)
|
||||
val count = query.get().count()
|
||||
Assertions.assertThat(count).isEqualTo(100)
|
||||
val result = query.get().iterator(40, 5)
|
||||
Assertions.assertThat(result.asSequence().count()).isEqualTo(5)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test calling an arbitrary JDBC native query`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
val nativeQuery = "SELECT v.transaction_id, v.output_index FROM vault_states v WHERE v.state_status = 0"
|
||||
|
||||
database.transaction {
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, true, makeTestDatabaseProperties())
|
||||
val jdbcSession = configuration.jdbcSession()
|
||||
val prepStatement = jdbcSession.prepareStatement(nativeQuery)
|
||||
val rs = prepStatement.executeQuery()
|
||||
assertTrue(rs.next())
|
||||
assertEquals(rs.getString(1), txn.tx.inputs[0].txhash.toString())
|
||||
assertEquals(rs.getInt(2), txn.tx.inputs[0].index)
|
||||
}
|
||||
}
|
||||
|
||||
private fun createVaultStateEntity(txn: SignedTransaction): VaultStatesEntity {
|
||||
val txnState = txn.tx.inputs[0]
|
||||
val state = VaultStatesEntity().apply {
|
||||
txId = txnState.txhash.toString()
|
||||
index = txnState.index
|
||||
stateStatus = Vault.StateStatus.UNCONSUMED
|
||||
contractStateClassName = DummyContract.SingleOwnerState::class.java.name
|
||||
contractState = DummyContract.SingleOwnerState(owner = AnonymousParty(MEGA_CORP_PUBKEY)).serialize().bytes
|
||||
notaryName = txn.tx.notary!!.name.toString()
|
||||
recordedTime = Instant.now()
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
||||
private fun createCashBalance(): VaultCashBalancesEntity {
|
||||
val cashBalanceEntity = VaultCashBalancesEntity()
|
||||
cashBalanceEntity.currency = "GBP"
|
||||
cashBalanceEntity.amount = 12345
|
||||
return cashBalanceEntity
|
||||
}
|
||||
|
||||
private fun newTransactionStorage() {
|
||||
database.transaction {
|
||||
transactionStorage = DBTransactionStorage()
|
||||
}
|
||||
}
|
||||
|
||||
private fun newRequeryStorage(dataSourceProperties: Properties) {
|
||||
database.transaction {
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, true, makeTestDatabaseProperties())
|
||||
requerySession = configuration.sessionForModel(Models.VAULT)
|
||||
}
|
||||
}
|
||||
|
||||
private fun newTransaction(index: Int = 0): SignedTransaction {
|
||||
val wtx = WireTransaction(
|
||||
inputs = listOf(StateRef(SecureHash.randomSHA256(), index)),
|
||||
attachments = emptyList(),
|
||||
outputs = emptyList(),
|
||||
commands = listOf(dummyCommand()),
|
||||
notary = DUMMY_NOTARY,
|
||||
timeWindow = null
|
||||
)
|
||||
return SignedTransaction(wtx, listOf(TransactionSignature(ByteArray(1), ALICE_PUBKEY, SignatureMetadata(1, Crypto.findSignatureScheme(ALICE_PUBKEY).schemeNumberID))))
|
||||
}
|
||||
}
|
@ -11,8 +11,10 @@ import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.node.services.MockServiceHubInternal
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
@ -71,7 +73,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
smmHasRemovedAllFlows = CountDownLatch(1)
|
||||
calls = 0
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
|
||||
val databaseProperties = makeTestDatabaseProperties()
|
||||
database = configureDatabase(dataSourceProps, databaseProperties, identitySvc = ::makeTestIdentityService)
|
||||
val identityService = InMemoryIdentityService(trustRoot = DUMMY_CA.certificate)
|
||||
val kms = MockKeyManagementService(identityService, ALICE_KEY)
|
||||
|
||||
@ -88,7 +91,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
overrideClock = testClock,
|
||||
keyManagement = kms,
|
||||
network = mockMessagingService), TestReference {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
|
||||
override val vaultService: VaultService = NodeVaultService(this)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
|
@ -59,7 +59,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
|
||||
|
||||
services = object : MockServices(BOB_KEY) {
|
||||
override val vaultService: VaultService get() {
|
||||
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
|
||||
val vaultService = NodeVaultService(this)
|
||||
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
|
||||
return vaultService
|
||||
}
|
||||
|
@ -7,14 +7,13 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.internal.read
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.core.internal.write
|
||||
import net.corda.core.internal.writeLines
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
@ -25,7 +24,6 @@ import java.nio.charset.Charset
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
import java.util.jar.JarEntry
|
||||
import java.util.jar.JarOutputStream
|
||||
import kotlin.test.assertEquals
|
||||
@ -36,17 +34,13 @@ class NodeAttachmentStorageTest {
|
||||
// Use an in memory file system for testing attachment storage.
|
||||
lateinit var fs: FileSystem
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var dataSourceProperties: Properties
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
|
||||
dataSourceProperties = makeTestDataSourceProperties()
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
|
||||
|
||||
configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = makeTestDatabaseProperties())
|
||||
fs = Jimfs.newFileSystem(Configuration.unix())
|
||||
}
|
||||
|
||||
@ -61,7 +55,7 @@ class NodeAttachmentStorageTest {
|
||||
val expectedHash = testJar.readAll().sha256()
|
||||
|
||||
database.transaction {
|
||||
val storage = NodeAttachmentService(dataSourceProperties, MetricRegistry(), makeTestDatabaseProperties())
|
||||
val storage = NodeAttachmentService(MetricRegistry())
|
||||
val id = testJar.read { storage.importAttachment(it) }
|
||||
assertEquals(expectedHash, id)
|
||||
|
||||
@ -87,7 +81,7 @@ class NodeAttachmentStorageTest {
|
||||
fun `duplicates not allowed`() {
|
||||
val testJar = makeTestJar()
|
||||
database.transaction {
|
||||
val storage = NodeAttachmentService(dataSourceProperties, MetricRegistry(), makeTestDatabaseProperties())
|
||||
val storage = NodeAttachmentService(MetricRegistry())
|
||||
testJar.read {
|
||||
storage.importAttachment(it)
|
||||
}
|
||||
@ -102,19 +96,21 @@ class NodeAttachmentStorageTest {
|
||||
@Test
|
||||
fun `corrupt entry throws exception`() {
|
||||
val testJar = makeTestJar()
|
||||
val id =
|
||||
database.transaction {
|
||||
val storage = NodeAttachmentService(dataSourceProperties, MetricRegistry(), makeTestDatabaseProperties())
|
||||
val storage = NodeAttachmentService(MetricRegistry())
|
||||
val id = testJar.read { storage.importAttachment(it) }
|
||||
|
||||
// Corrupt the file in the store.
|
||||
val bytes = testJar.readAll()
|
||||
val corruptBytes = "arggghhhh".toByteArray()
|
||||
System.arraycopy(corruptBytes, 0, bytes, 0, corruptBytes.size)
|
||||
val corruptAttachment = AttachmentEntity()
|
||||
corruptAttachment.attId = id
|
||||
corruptAttachment.content = bytes
|
||||
storage.session.update(corruptAttachment)
|
||||
|
||||
val corruptAttachment = NodeAttachmentService.DBAttachment(attId = id.toString(), content = bytes)
|
||||
DatabaseTransactionManager.current().session.merge(corruptAttachment)
|
||||
id
|
||||
}
|
||||
database.transaction {
|
||||
val storage = NodeAttachmentService(MetricRegistry())
|
||||
val e = assertFailsWith<NodeAttachmentService.HashMismatchException> {
|
||||
storage.openAttachment(id)!!.open().use { it.readBytes() }
|
||||
}
|
||||
@ -131,7 +127,7 @@ class NodeAttachmentStorageTest {
|
||||
@Test
|
||||
fun `non jar rejected`() {
|
||||
database.transaction {
|
||||
val storage = NodeAttachmentService(dataSourceProperties, MetricRegistry(), makeTestDatabaseProperties())
|
||||
val storage = NodeAttachmentService(MetricRegistry())
|
||||
val path = fs.getPath("notajar")
|
||||
path.writeLines(listOf("Hey", "there!"))
|
||||
path.read {
|
||||
|
@ -84,9 +84,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `states not local to instance`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val w1 = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(w1).hasSize(3)
|
||||
|
||||
@ -112,9 +112,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `states for refs`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val w1 = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(w1).hasSize(3)
|
||||
|
||||
@ -126,8 +126,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `states soft locking reserve and release`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
}
|
||||
database.transaction {
|
||||
|
||||
val unconsumedStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(unconsumedStates).hasSize(3)
|
||||
@ -309,8 +310,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `unconsumedStatesForSpending exact amount`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L))
|
||||
}
|
||||
database.transaction {
|
||||
|
||||
val unconsumedStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(unconsumedStates).hasSize(1)
|
||||
@ -327,10 +329,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `unconsumedStatesForSpending from two issuer parties`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (DUMMY_CASH_ISSUER))
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (BOC.ref(1)))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val spendableStatesUSD = vaultSvc.unconsumedCashStatesForSpending(200.DOLLARS,
|
||||
onlyFromIssuerParties = setOf(DUMMY_CASH_ISSUER.party, BOC))
|
||||
spendableStatesUSD.forEach(::println)
|
||||
@ -344,12 +346,12 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `unconsumedStatesForSpending from specific issuer party and refs`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (DUMMY_CASH_ISSUER))
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (BOC.ref(1)), ref = OpaqueBytes.of(1))
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (BOC.ref(2)), ref = OpaqueBytes.of(2))
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (BOC.ref(3)), ref = OpaqueBytes.of(3))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val unconsumedStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(unconsumedStates).hasSize(4)
|
||||
|
||||
@ -366,9 +368,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `unconsumedStatesForSpending insufficient amount`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val unconsumedStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(unconsumedStates).hasSize(1)
|
||||
|
||||
@ -383,9 +385,9 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `unconsumedStatesForSpending small amount`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 2, 2, Random(0L))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val unconsumedStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(unconsumedStates).hasSize(2)
|
||||
|
||||
@ -401,11 +403,11 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `states soft locking query granularity`() {
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 10, 10, Random(0L))
|
||||
services.fillWithSomeTestCash(100.POUNDS, issuerServices, DUMMY_NOTARY, 10, 10, Random(0L))
|
||||
services.fillWithSomeTestCash(100.SWISS_FRANCS, issuerServices, DUMMY_NOTARY, 10, 10, Random(0L))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
var unlockedStates = 30
|
||||
val allStates = vaultQuery.queryBy<Cash.State>().states
|
||||
assertThat(allStates).hasSize(unlockedStates)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -61,7 +61,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
database.transaction {
|
||||
// Fix the PRNG so that we get the same splits every time.
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 3, 3, Random(0L), issuedBy = DUMMY_CASH_ISSUER)
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val w = vaultQuery.queryBy<Cash.State>().states
|
||||
assertEquals(3, w.size)
|
||||
|
||||
@ -77,25 +78,31 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `issue and spend total correctly and irrelevant ignored`() {
|
||||
val megaCorpServices = MockServices(MEGA_CORP_KEY)
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
|
||||
val usefulTX =
|
||||
database.transaction {
|
||||
// A tx that sends us money.
|
||||
val usefulBuilder = TransactionBuilder(null)
|
||||
Cash().generateIssue(usefulBuilder, 100.DOLLARS `issued by` MEGA_CORP.ref(1), AnonymousParty(freshKey), DUMMY_NOTARY)
|
||||
megaCorpServices.signInitialTransaction(usefulBuilder)
|
||||
}
|
||||
database.transaction {
|
||||
// A tx that sends us money.
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val usefulBuilder = TransactionBuilder(null)
|
||||
Cash().generateIssue(usefulBuilder, 100.DOLLARS `issued by` MEGA_CORP.ref(1), AnonymousParty(freshKey), DUMMY_NOTARY)
|
||||
val usefulTX = megaCorpServices.signInitialTransaction(usefulBuilder)
|
||||
|
||||
assertEquals(0.DOLLARS, services.getCashBalance(USD))
|
||||
services.recordTransactions(usefulTX)
|
||||
|
||||
// A tx that spends our money.
|
||||
val spendTXBuilder = TransactionBuilder(DUMMY_NOTARY)
|
||||
Cash.generateSpend(services, spendTXBuilder, 80.DOLLARS, BOB)
|
||||
val spendPTX = services.signInitialTransaction(spendTXBuilder, freshKey)
|
||||
val spendTX = notaryServices.addSignature(spendPTX)
|
||||
|
||||
}
|
||||
val spendTX =
|
||||
database.transaction {
|
||||
// A tx that spends our money.
|
||||
val spendTXBuilder = TransactionBuilder(DUMMY_NOTARY)
|
||||
Cash.generateSpend(services, spendTXBuilder, 80.DOLLARS, BOB)
|
||||
val spendPTX = services.signInitialTransaction(spendTXBuilder, freshKey)
|
||||
notaryServices.addSignature(spendPTX)
|
||||
}
|
||||
database.transaction {
|
||||
assertEquals(100.DOLLARS, services.getCashBalance(USD))
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
// A tx that doesn't send us anything.
|
||||
val irrelevantBuilder = TransactionBuilder(DUMMY_NOTARY)
|
||||
Cash().generateIssue(irrelevantBuilder, 100.DOLLARS `issued by` MEGA_CORP.ref(1), BOB, DUMMY_NOTARY)
|
||||
@ -104,12 +111,15 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
val irrelevantTX = notaryServices.addSignature(irrelevantPTX)
|
||||
|
||||
services.recordTransactions(irrelevantTX)
|
||||
}
|
||||
database.transaction {
|
||||
assertEquals(100.DOLLARS, services.getCashBalance(USD))
|
||||
}
|
||||
database.transaction {
|
||||
services.recordTransactions(spendTX)
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
assertEquals(20.DOLLARS, services.getCashBalance(USD))
|
||||
|
||||
// TODO: Flesh out these tests as needed.
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,7 +133,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 10, 10, Random(0L), ownedBy = AnonymousParty(freshKey),
|
||||
issuedBy = MEGA_CORP.ref(1))
|
||||
println("Cash balance: ${services.getCashBalance(USD)}")
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
assertThat(vaultQuery.queryBy<Cash.State>().states).hasSize(10)
|
||||
assertThat(vaultQuery.queryBy<Cash.State>(criteriaLocked).states).hasSize(0)
|
||||
}
|
||||
@ -234,22 +245,23 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
|
||||
@Test
|
||||
fun `sequencing LinearStates works`() {
|
||||
database.transaction {
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val freshIdentity = AnonymousParty(freshKey)
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val freshIdentity = AnonymousParty(freshKey)
|
||||
val linearId = UniqueIdentifier()
|
||||
|
||||
val linearId = UniqueIdentifier()
|
||||
|
||||
// Issue a linear state
|
||||
val dummyIssue =
|
||||
database.transaction { // Issue a linear state
|
||||
val dummyIssueBuilder = TransactionBuilder(notary = DUMMY_NOTARY)
|
||||
.addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshIdentity)))
|
||||
.addCommand(dummyCommand(notaryServices.legalIdentityKey))
|
||||
.addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshIdentity))).addCommand(dummyCommand(notaryServices.legalIdentityKey))
|
||||
val dummyIssuePtx = notaryServices.signInitialTransaction(dummyIssueBuilder)
|
||||
val dummyIssue = services.addSignature(dummyIssuePtx)
|
||||
|
||||
dummyIssue.toLedgerTransaction(services).verify()
|
||||
dummyIssue.toLedgerTransaction(services).verify()
|
||||
|
||||
services.recordTransactions(dummyIssue)
|
||||
services.recordTransactions(dummyIssue)
|
||||
dummyIssue
|
||||
}
|
||||
database.transaction {
|
||||
assertThat(vaultQuery.queryBy<DummyLinearContract.State>().states).hasSize(1)
|
||||
|
||||
// Move the same state
|
||||
@ -263,6 +275,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
dummyIssue.toLedgerTransaction(services).verify()
|
||||
|
||||
services.recordTransactions(dummyMove)
|
||||
}
|
||||
database.transaction {
|
||||
assertThat(vaultQuery.queryBy<DummyLinearContract.State>().states).hasSize(1)
|
||||
}
|
||||
}
|
||||
@ -275,10 +289,15 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
services.fillWithSomeTestCash(100.DOLLARS, issuerServices, DUMMY_NOTARY, 3, 3, Random(0L), ownedBy = AnonymousParty(freshKey))
|
||||
services.fillWithSomeTestCash(100.SWISS_FRANCS, issuerServices, DUMMY_NOTARY, 2, 2, Random(0L))
|
||||
services.fillWithSomeTestCash(100.POUNDS, issuerServices, DUMMY_NOTARY, 1, 1, Random(0L))
|
||||
}
|
||||
database.transaction {
|
||||
val cash = vaultQuery.queryBy<Cash.State>().states
|
||||
cash.forEach { println(it.state.data.amount) }
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
services.fillWithSomeTestDeals(listOf("123", "456", "789"))
|
||||
}
|
||||
database.transaction {
|
||||
val deals = vaultQuery.queryBy<DummyDealContract.State>().states
|
||||
deals.forEach { println(it.state.data.linearId.externalId!!) }
|
||||
}
|
||||
@ -290,7 +309,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
val spendPTX = notaryServices.signInitialTransaction(spendTXBuilder)
|
||||
val spendTX = services.addSignature(spendPTX, freshKey)
|
||||
services.recordTransactions(spendTX)
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val consumedStates = vaultQuery.queryBy<ContractState>(VaultQueryCriteria(status = Vault.StateStatus.CONSUMED)).states
|
||||
assertEquals(3, consumedStates.count())
|
||||
|
||||
@ -300,17 +320,21 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consuming multiple contract state types in same transaction`() {
|
||||
fun `consuming multiple contract state types`() {
|
||||
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
val freshIdentity = AnonymousParty(freshKey)
|
||||
database.transaction {
|
||||
|
||||
services.fillWithSomeTestDeals(listOf("123", "456", "789"))
|
||||
val deals = vaultQuery.queryBy<DummyDealContract.State>().states
|
||||
deals.forEach { println(it.state.data.linearId.externalId!!) }
|
||||
|
||||
}
|
||||
val deals =
|
||||
database.transaction {
|
||||
vaultQuery.queryBy<DummyDealContract.State>().states
|
||||
}
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(3)
|
||||
}
|
||||
database.transaction {
|
||||
val linearStates = vaultQuery.queryBy<DummyLinearContract.State>().states
|
||||
linearStates.forEach { println(it.state.data.linearId) }
|
||||
|
||||
@ -324,10 +348,10 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
|
||||
}
|
||||
|
||||
val dummyMove = notaryServices.signInitialTransaction(dummyMoveBuilder)
|
||||
|
||||
dummyMove.toLedgerTransaction(services).verify()
|
||||
services.recordTransactions(dummyMove)
|
||||
|
||||
}
|
||||
database.transaction {
|
||||
val consumedStates = vaultQuery.queryBy<ContractState>(VaultQueryCriteria(status = Vault.StateStatus.CONSUMED)).states
|
||||
assertEquals(2, consumedStates.count())
|
||||
|
||||
|
@ -6,7 +6,6 @@ include 'finance:isolated'
|
||||
include 'core'
|
||||
include 'docs'
|
||||
include 'node-api'
|
||||
include 'node-schemas'
|
||||
include 'node'
|
||||
include 'node:capsule'
|
||||
include 'client:jackson'
|
||||
|
@ -87,8 +87,8 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub {
|
||||
|
||||
lateinit var hibernatePersister: HibernateObserver
|
||||
|
||||
fun makeVaultService(dataSourceProps: Properties, hibernateConfig: HibernateConfiguration = HibernateConfiguration(NodeSchemaService(), makeTestDatabaseProperties(), { identityService })): VaultService {
|
||||
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
|
||||
fun makeVaultService(hibernateConfig: HibernateConfiguration = HibernateConfiguration(NodeSchemaService(), makeTestDatabaseProperties(), { identityService })): VaultService {
|
||||
val vaultService = NodeVaultService(this)
|
||||
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
|
||||
return vaultService
|
||||
}
|
||||
@ -227,7 +227,7 @@ fun makeTestDatabaseAndMockServices(customSchemas: Set<MappedSchema> = setOf(Com
|
||||
val mockService = database.transaction {
|
||||
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), databaseProperties, identitySvc = identitySvc)
|
||||
object : MockServices(*(keys.toTypedArray())) {
|
||||
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
|
||||
override val vaultService: VaultService = makeVaultService(hibernateConfig)
|
||||
|
||||
override fun recordTransactions(notifyVault: Boolean, txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
|
Loading…
Reference in New Issue
Block a user