mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
Vault Query Service JPA implementation (#840)
* Vault Query Service API implementation using JPA Hibernate Added queryBy(QueryCriteria) Vault API and Junit tests. Minor cosmetic API changes following rebase. Fixes following rebase from master Upgraded to requery 1.3.1 WIP - removed 'latestOnly' from LinearStateQueryCriteria WIP - CommercialSchemas V2, V3, V4 testing WIP - sort out generics handling. WIP - most general queries completed. WIP - join queries, contractStateType derivation WIP - refactoring Requery WIP - refactored VaultService to extract a VaultQueryService interface (and associated Requery implementation). WIP - HibernateVaultQuery implementation WIP - Re-structured all Schema definitions (requery/jpa) and make Hibernate Config reusable. WIP - Multi-version schema testing, hibernate query testing. WIP - Custom Criteria and Fungible Criteria impl & testing. WIP - Kotlin Comparable Generics error WIP - Party queries all working now WIP - All VaultQueryTests now working (refactored for AND / OR composition) WIP - added schema registration in CordaPluginRegistry to enable custom vault queries on arbitrary schemas. WIP - added new default Sort NULL order to be NONE + added lots more tests for Logical Operator testing. Mostly identity fixes following rebase from master. Exception handling and public API cleanup in prep for PR. Additional tests for Logical Operators; additional tests for NULLS sort ordering; additional logging; Additional parser to handle Nullable attribute values; added Unary and Collection logical expression handlers Lots of cleanup: participants; trackBy interfaces; additional fungible tests; parser cleanup and improved support for Java Removed all traces of Requery implementation. Further minor cleanup and Junit test fix. Final identity and schema related identity clean-up. Revert unrelated changes. PR review updates: blank lines, isRelevant. Fixed wiring of updatesPublisher for dynamic trackBy queries. PR review changes: multi-versioned schema samples and associated dummy contracts moved to test packages. Fixed problem with sorted queries (not specifying any filterable criteria). PR review: minor updates to address RP comments. Typesafe custom query criteria Cleanup: remove redundant tests. Further clean-up and make all Java test work successfully. Remove debugging print statements. Rebased from master - changes required due to DealState module change. fixed broken assertion caused by DealState ordering change (different package) Fixed transaction demarcation issue causing "java.lang.IllegalStateException: Was not expecting to find existing database transaction on current strand" trackBy() now filters on ContractType and StateStatus (CONSUMED, UNCONSUMED, ALL) Added tests to exercise RPCOps trackBy and queryBy (RPC smoke test and CordaRPCOps) Added additional @CordaSerializable annotations. Updated documentation and referenced sample code. Added deprecation annotations. Re-added missing deprecation annotation. Hibernate debug logging is now configurable and disabled by default. Introduced common Sort attributes based on the node schemas. Completely removed NULL_HANDLING sort parameter as this is not supported in JPA. Revisited and fixed usage of @CordaSerializable. * Minor fix following rebase from master. * Remove blank line as per RP PR feedback request. * Minor Java documentation and example clean-up. * Disable BFT Notary Service tests.
This commit is contained in:
@ -22,6 +22,7 @@ import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.*
|
||||
@ -52,11 +53,13 @@ class BFTNotaryServiceTests : NodeBasedTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Under investigation due to failure on TC build server")
|
||||
fun `detect double spend 1 faulty`() {
|
||||
detectDoubleSpend(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Under investigation due to failure on TC build server")
|
||||
fun `detect double spend 2 faulty`() {
|
||||
detectDoubleSpend(2)
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import net.corda.node.services.api.*
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.events.ScheduledActivityObserver
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
@ -51,6 +52,7 @@ import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
|
||||
import net.corda.node.services.vault.HibernateVaultQueryImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
@ -121,6 +123,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
override val networkMapCache: NetworkMapCacheInternal get() = netMapCache
|
||||
override val storageService: TxWritableStorageService get() = storage
|
||||
override val vaultService: VaultService get() = vault
|
||||
override val vaultQueryService: VaultQueryService get() = vaultQuery
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
override val identityService: IdentityService get() = identity
|
||||
override val schedulerService: SchedulerService get() = scheduler
|
||||
@ -164,6 +167,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
lateinit var checkpointStorage: CheckpointStorage
|
||||
lateinit var smm: StateMachineManager
|
||||
lateinit var vault: VaultService
|
||||
lateinit var vaultQuery: VaultQueryService
|
||||
lateinit var keyManagement: KeyManagementService
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
lateinit var txVerifierService: TransactionVerifierService
|
||||
@ -447,6 +451,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
network = makeMessagingService()
|
||||
schemas = makeSchemaService()
|
||||
vault = makeVaultService(configuration.dataSourceProperties)
|
||||
vaultQuery = makeVaultQueryService(schemas)
|
||||
txVerifierService = makeTransactionVerifierService()
|
||||
auditService = DummyAuditService()
|
||||
|
||||
@ -525,7 +530,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
VaultSoftLockManager(vault, smm)
|
||||
CashBalanceAsMetricsObserver(services, database)
|
||||
ScheduledActivityObserver(services)
|
||||
HibernateObserver(vault.rawUpdates, schemas)
|
||||
HibernateObserver(vault.rawUpdates, HibernateConfiguration(schemas))
|
||||
}
|
||||
|
||||
private fun makeInfo(): NodeInfo {
|
||||
@ -704,7 +709,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeVaultService(dataSourceProperties: Properties): VaultService = NodeVaultService(services, dataSourceProperties)
|
||||
|
||||
protected open fun makeSchemaService(): SchemaService = NodeSchemaService()
|
||||
protected open fun makeVaultQueryService(schemas: SchemaService): VaultQueryService = HibernateVaultQueryImpl(HibernateConfiguration(schemas), vault.updatesPublisher)
|
||||
|
||||
protected open fun makeSchemaService(): SchemaService = NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet())
|
||||
|
||||
protected abstract fun makeTransactionVerifierService(): TransactionVerifierService
|
||||
|
||||
|
@ -14,6 +14,7 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.StateMachineTransactionMapping
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
@ -59,7 +60,7 @@ class CordaRPCOpsImpl(
|
||||
paging: PageSpecification,
|
||||
sorting: Sort): Vault.Page<T> {
|
||||
return database.transaction {
|
||||
services.vaultService.queryBy<T>(criteria, paging, sorting)
|
||||
services.vaultQueryService._queryBy(criteria, paging, sorting, ContractState::class.java as Class<T>)
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +69,7 @@ class CordaRPCOpsImpl(
|
||||
paging: PageSpecification,
|
||||
sorting: Sort): Vault.PageAndUpdates<T> {
|
||||
return database.transaction {
|
||||
services.vaultService.trackBy<T>(criteria, paging, sorting)
|
||||
services.vaultQueryService._trackBy<T>(criteria, paging, sorting, ContractState::class.java as Class<T>)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
|
||||
//DOCSTART SchemaService
|
||||
/**
|
||||
@ -23,12 +23,12 @@ interface SchemaService {
|
||||
* Given a state, select schemas to map it to that are supported by [generateMappedObject] and that are configured
|
||||
* for this node.
|
||||
*/
|
||||
fun selectSchemas(state: QueryableState): Iterable<MappedSchema>
|
||||
fun selectSchemas(state: ContractState): Iterable<MappedSchema>
|
||||
|
||||
/**
|
||||
* Map a state to a [PersistentState] for the given schema, either via direct support from the state
|
||||
* or via custom logic in this service.
|
||||
*/
|
||||
fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState
|
||||
fun generateMappedObject(state: ContractState, schema: MappedSchema): PersistentState
|
||||
}
|
||||
//DOCEND SchemaService
|
||||
|
@ -0,0 +1,119 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.MetadataSources
|
||||
import org.hibernate.boot.model.naming.Identifier
|
||||
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
|
||||
import org.hibernate.boot.registry.BootstrapServiceRegistryBuilder
|
||||
import org.hibernate.cfg.Configuration
|
||||
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
|
||||
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
|
||||
import org.hibernate.service.UnknownUnwrapTypeException
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class HibernateConfiguration(val schemaService: SchemaService, val useDefaultLogging: Boolean = false) {
|
||||
constructor(schemaService: SchemaService) : this(schemaService, false)
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<HibernateConfiguration>()
|
||||
}
|
||||
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
|
||||
|
||||
init {
|
||||
schemaService.schemaOptions.map { it.key }.forEach { mappedSchema ->
|
||||
sessionFactories.computeIfAbsent(mappedSchema, { makeSessionFactoryForSchema(mappedSchema) })
|
||||
}
|
||||
}
|
||||
|
||||
fun sessionFactoryForRegisteredSchemas(): SessionFactory {
|
||||
return sessionFactoryForSchemas(*schemaService.schemaOptions.map { it.key }.toTypedArray())
|
||||
}
|
||||
|
||||
fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {
|
||||
return sessionFactories.computeIfAbsent(schema, { sessionFactoryForSchemas(schema) })
|
||||
}
|
||||
|
||||
fun sessionFactoryForSchemas(vararg schemas: MappedSchema): SessionFactory {
|
||||
return makeSessionFactoryForSchemas(schemas.iterator())
|
||||
}
|
||||
|
||||
private fun makeSessionFactoryForSchema(schema: MappedSchema): SessionFactory {
|
||||
return makeSessionFactoryForSchemas(setOf(schema).iterator())
|
||||
}
|
||||
|
||||
private fun makeSessionFactoryForSchemas(schemas: Iterator<MappedSchema>): SessionFactory {
|
||||
logger.info("Creating session factory for schemas: $schemas")
|
||||
val serviceRegistry = BootstrapServiceRegistryBuilder().build()
|
||||
val metadataSources = MetadataSources(serviceRegistry)
|
||||
// We set a connection provider as the auto schema generation requires it. The auto schema generation will not
|
||||
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
|
||||
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
|
||||
val config = Configuration(metadataSources).setProperty("hibernate.connection.provider_class", HibernateConfiguration.NodeDatabaseConnectionProvider::class.java.name)
|
||||
.setProperty("hibernate.hbm2ddl.auto", "update")
|
||||
.setProperty("hibernate.show_sql", "$useDefaultLogging")
|
||||
.setProperty("hibernate.format_sql", "$useDefaultLogging")
|
||||
schemas.forEach { schema ->
|
||||
// TODO: require mechanism to set schemaOptions (databaseSchema, tablePrefix) which are not global to session
|
||||
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
|
||||
}
|
||||
val sessionFactory = buildSessionFactory(config, metadataSources, "")
|
||||
logger.info("Created session factory for schemas: $schemas")
|
||||
return sessionFactory
|
||||
}
|
||||
|
||||
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, tablePrefix: String): SessionFactory {
|
||||
config.standardServiceRegistryBuilder.applySettings(config.properties)
|
||||
val metadata = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build()).run {
|
||||
applyPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
|
||||
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
|
||||
val default = super.toPhysicalTableName(name, context)
|
||||
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
|
||||
}
|
||||
})
|
||||
build()
|
||||
}
|
||||
|
||||
return metadata.sessionFactoryBuilder.run {
|
||||
allowOutOfTransactionUpdateOperations(true)
|
||||
applySecondLevelCacheSupport(false)
|
||||
applyQueryCacheSupport(false)
|
||||
enableReleaseResourcesOnCloseEnabled(true)
|
||||
build()
|
||||
}
|
||||
}
|
||||
|
||||
// Supply Hibernate with connections from our underlying Exposed database integration. Only used
|
||||
// during schema creation / update.
|
||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||
override fun closeConnection(conn: Connection) {
|
||||
val tx = TransactionManager.current()
|
||||
tx.commit()
|
||||
tx.close()
|
||||
}
|
||||
|
||||
override fun supportsAggressiveRelease(): Boolean = true
|
||||
|
||||
override fun getConnection(): Connection {
|
||||
val tx = TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
return tx.connection
|
||||
}
|
||||
|
||||
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
||||
try {
|
||||
return unwrapType.cast(this)
|
||||
} catch(e: ClassCastException) {
|
||||
throw UnknownUnwrapTypeException(unwrapType)
|
||||
}
|
||||
}
|
||||
|
||||
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = (unwrapType == NodeDatabaseConnectionProvider::class.java)
|
||||
}
|
||||
}
|
@ -21,8 +21,8 @@ import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.AcceptsFileUpload
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.Models
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.requery.Models
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.FilterInputStream
|
||||
import java.io.IOException
|
||||
|
@ -9,70 +9,25 @@ import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import org.hibernate.FlushMode
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.MetadataSources
|
||||
import org.hibernate.boot.model.naming.Identifier
|
||||
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
|
||||
import org.hibernate.boot.registry.BootstrapServiceRegistryBuilder
|
||||
import org.hibernate.cfg.Configuration
|
||||
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
|
||||
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
|
||||
import org.hibernate.service.UnknownUnwrapTypeException
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.Observable
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
|
||||
*/
|
||||
// TODO: Manage version evolution of the schemas via additional tooling.
|
||||
class HibernateObserver(vaultUpdates: Observable<Vault.Update>, val schemaService: SchemaService) {
|
||||
class HibernateObserver(vaultUpdates: Observable<Vault.Update>, val config: HibernateConfiguration) {
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<HibernateObserver>()
|
||||
}
|
||||
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
|
||||
|
||||
init {
|
||||
schemaService.schemaOptions.map { it.key }.forEach {
|
||||
makeSessionFactoryForSchema(it)
|
||||
}
|
||||
vaultUpdates.subscribe { persist(it.produced) }
|
||||
}
|
||||
|
||||
private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {
|
||||
return sessionFactories.computeIfAbsent(schema, { makeSessionFactoryForSchema(it) })
|
||||
}
|
||||
|
||||
private fun makeSessionFactoryForSchema(schema: MappedSchema): SessionFactory {
|
||||
logger.info("Creating session factory for schema $schema")
|
||||
val serviceRegistry = BootstrapServiceRegistryBuilder().build()
|
||||
val metadataSources = MetadataSources(serviceRegistry)
|
||||
// We set a connection provider as the auto schema generation requires it. The auto schema generation will not
|
||||
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
|
||||
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
|
||||
val config = Configuration(metadataSources).setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name)
|
||||
.setProperty("hibernate.hbm2ddl.auto", "update")
|
||||
.setProperty("hibernate.show_sql", "false")
|
||||
.setProperty("hibernate.format_sql", "true")
|
||||
val options = schemaService.schemaOptions[schema]
|
||||
val databaseSchema = options?.databaseSchema
|
||||
if (databaseSchema != null) {
|
||||
logger.debug { "Database schema = $databaseSchema" }
|
||||
config.setProperty("hibernate.default_schema", databaseSchema)
|
||||
}
|
||||
val tablePrefix = options?.tablePrefix ?: "contract_" // We always have this as the default for aesthetic reasons.
|
||||
logger.debug { "Table prefix = $tablePrefix" }
|
||||
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
|
||||
val sessionFactory = buildSessionFactory(config, metadataSources, tablePrefix)
|
||||
logger.info("Created session factory for schema $schema")
|
||||
return sessionFactory
|
||||
}
|
||||
|
||||
private fun persist(produced: Set<StateAndRef<ContractState>>) {
|
||||
produced.forEach { persistState(it) }
|
||||
}
|
||||
@ -81,69 +36,21 @@ class HibernateObserver(vaultUpdates: Observable<Vault.Update>, val schemaServic
|
||||
val state = stateAndRef.state.data
|
||||
if (state is QueryableState) {
|
||||
logger.debug { "Asked to persist state ${stateAndRef.ref}" }
|
||||
schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
|
||||
config.schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun persistStateWithSchema(state: QueryableState, stateRef: StateRef, schema: MappedSchema) {
|
||||
val sessionFactory = sessionFactoryForSchema(schema)
|
||||
fun persistStateWithSchema(state: QueryableState, stateRef: StateRef, schema: MappedSchema) {
|
||||
val sessionFactory = config.sessionFactoryForSchema(schema)
|
||||
val session = sessionFactory.withOptions().
|
||||
connection(TransactionManager.current().connection).
|
||||
flushMode(FlushMode.MANUAL).
|
||||
openSession()
|
||||
session.use {
|
||||
val mappedObject = schemaService.generateMappedObject(state, schema)
|
||||
val mappedObject = config.schemaService.generateMappedObject(state, schema)
|
||||
mappedObject.stateRef = PersistentStateRef(stateRef)
|
||||
it.persist(mappedObject)
|
||||
it.flush()
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, tablePrefix: String): SessionFactory {
|
||||
config.standardServiceRegistryBuilder.applySettings(config.properties)
|
||||
val metadata = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build()).run {
|
||||
applyPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
|
||||
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
|
||||
val default = super.toPhysicalTableName(name, context)
|
||||
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
|
||||
}
|
||||
})
|
||||
build()
|
||||
}
|
||||
|
||||
return metadata.sessionFactoryBuilder.run {
|
||||
allowOutOfTransactionUpdateOperations(true)
|
||||
applySecondLevelCacheSupport(false)
|
||||
applyQueryCacheSupport(false)
|
||||
enableReleaseResourcesOnCloseEnabled(true)
|
||||
build()
|
||||
}
|
||||
}
|
||||
|
||||
// Supply Hibernate with connections from our underlying Exposed database integration. Only used
|
||||
// during schema creation / update.
|
||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||
override fun closeConnection(conn: Connection) {
|
||||
val tx = TransactionManager.current()
|
||||
tx.commit()
|
||||
tx.close()
|
||||
}
|
||||
|
||||
override fun supportsAggressiveRelease(): Boolean = true
|
||||
|
||||
override fun getConnection(): Connection {
|
||||
val tx = TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
return tx.connection
|
||||
}
|
||||
|
||||
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
||||
try {
|
||||
return unwrapType.cast(this)
|
||||
} catch(e: ClassCastException) {
|
||||
throw UnknownUnwrapTypeException(unwrapType)
|
||||
}
|
||||
}
|
||||
|
||||
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = (unwrapType == NodeDatabaseConnectionProvider::class.java)
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,16 @@
|
||||
package net.corda.node.services.schema
|
||||
|
||||
import net.corda.contracts.DealState
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.FungibleAsset
|
||||
import net.corda.core.contracts.LinearState
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.vault.schemas.jpa.CommonSchemaV1
|
||||
import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1
|
||||
import net.corda.schemas.CashSchemaV1
|
||||
|
||||
/**
|
||||
@ -15,21 +21,46 @@ import net.corda.schemas.CashSchemaV1
|
||||
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
|
||||
* TODO: create whitelisted tables when a CorDapp is first installed
|
||||
*/
|
||||
class NodeSchemaService : SchemaService, SingletonSerializeAsToken() {
|
||||
class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaService, SingletonSerializeAsToken() {
|
||||
|
||||
// Currently does not support configuring schema options.
|
||||
|
||||
// Whitelisted tables are those required by internal Corda services
|
||||
// For example, cash is used by the vault for coin selection
|
||||
// This whitelist will grow as we add further functionality (eg. other fungible assets)
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = mapOf(Pair(CashSchemaV1, SchemaService.SchemaOptions()))
|
||||
// Required schemas are those used by internal Corda services
|
||||
// For example, cash is used by the vault for coin selection (but will be extracted as a standalone CorDapp in future)
|
||||
val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> =
|
||||
mapOf(Pair(CashSchemaV1, SchemaService.SchemaOptions()),
|
||||
Pair(CommonSchemaV1, SchemaService.SchemaOptions()),
|
||||
Pair(VaultSchemaV1, SchemaService.SchemaOptions()))
|
||||
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas.plus(customSchemas.map {
|
||||
mappedSchema -> Pair(mappedSchema, SchemaService.SchemaOptions())
|
||||
})
|
||||
|
||||
// Currently returns all schemas supported by the state, with no filtering or enrichment.
|
||||
override fun selectSchemas(state: QueryableState): Iterable<MappedSchema> {
|
||||
return state.supportedSchemas()
|
||||
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> {
|
||||
val schemas = mutableSetOf<MappedSchema>()
|
||||
if (state is QueryableState)
|
||||
schemas += state.supportedSchemas()
|
||||
if (state is LinearState)
|
||||
schemas += VaultSchemaV1 // VaultLinearStates
|
||||
// TODO: DealState to be deprecated (collapsed into LinearState)
|
||||
if (state is DealState)
|
||||
schemas += VaultSchemaV1 // VaultLinearStates
|
||||
if (state is FungibleAsset<*>)
|
||||
schemas += VaultSchemaV1 // VaultFungibleStates
|
||||
|
||||
return schemas
|
||||
}
|
||||
|
||||
// Because schema is always one supported by the state, just delegate.
|
||||
override fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState {
|
||||
return state.generateMappedObject(schema)
|
||||
override fun generateMappedObject(state: ContractState, schema: MappedSchema): PersistentState {
|
||||
// TODO: DealState to be deprecated (collapsed into LinearState)
|
||||
if ((schema is VaultSchemaV1) && (state is DealState))
|
||||
return VaultSchemaV1.VaultLinearStates(state.linearId, state.ref, state.participants)
|
||||
if ((schema is VaultSchemaV1) && (state is LinearState))
|
||||
return VaultSchemaV1.VaultLinearStates(state.linearId, "", state.participants)
|
||||
if ((schema is VaultSchemaV1) && (state is FungibleAsset<*>))
|
||||
return VaultSchemaV1.VaultFungibleStates(state.owner, state.amount.quantity, state.amount.token.issuer.party, state.amount.token.issuer.reference, state.participants)
|
||||
return (state as QueryableState).generateMappedObject(schema)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,368 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.serialization.toHexString
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.vault.schemas.jpa.CommonSchemaV1
|
||||
import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import java.util.*
|
||||
import javax.persistence.Tuple
|
||||
import javax.persistence.criteria.*
|
||||
|
||||
|
||||
class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
|
||||
val contractTypeMappings: Map<String, List<String>>,
|
||||
val criteriaBuilder: CriteriaBuilder,
|
||||
val criteriaQuery: CriteriaQuery<Tuple>,
|
||||
val vaultStates: Root<VaultSchemaV1.VaultStates>) : IQueryCriteriaParser {
|
||||
private companion object {
|
||||
val log = loggerFor<HibernateQueryCriteriaParser>()
|
||||
}
|
||||
|
||||
// incrementally build list of join predicates
|
||||
private val joinPredicates = mutableListOf<Predicate>()
|
||||
// incrementally build list of root entities (for later use in Sort parsing)
|
||||
private val rootEntities = mutableMapOf<Class<out PersistentState>, Root<*>>()
|
||||
|
||||
var stateTypes: Vault.StateStatus = Vault.StateStatus.UNCONSUMED
|
||||
|
||||
override fun parseCriteria(criteria: QueryCriteria.VaultQueryCriteria) : Collection<Predicate> {
|
||||
log.trace { "Parsing VaultQueryCriteria: $criteria" }
|
||||
val predicateSet = mutableSetOf<Predicate>()
|
||||
|
||||
// state status
|
||||
stateTypes = criteria.status
|
||||
if (criteria.status == Vault.StateStatus.ALL)
|
||||
predicateSet.add(vaultStates.get<Vault.StateStatus>("stateStatus").`in`(setOf(Vault.StateStatus.UNCONSUMED, Vault.StateStatus.CONSUMED)))
|
||||
else
|
||||
predicateSet.add(criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>("stateStatus"), criteria.status))
|
||||
|
||||
// contract State Types
|
||||
val combinedContractTypeTypes = criteria.contractStateTypes?.plus(contractType) ?: setOf(contractType)
|
||||
combinedContractTypeTypes.filter { it.name != ContractState::class.java.name }.let {
|
||||
val interfaces = it.flatMap { contractTypeMappings[it.name] ?: emptyList() }
|
||||
val concrete = it.filter { !it.isInterface }.map { it.name }
|
||||
val all = interfaces.plus(concrete)
|
||||
if (all.isNotEmpty())
|
||||
predicateSet.add(criteriaBuilder.and(vaultStates.get<String>("contractStateClassName").`in`(all)))
|
||||
}
|
||||
|
||||
// soft locking
|
||||
if (!criteria.includeSoftlockedStates)
|
||||
predicateSet.add(criteriaBuilder.and(vaultStates.get<String>("lockId").isNull))
|
||||
|
||||
// notary names
|
||||
criteria.notaryName?.let {
|
||||
val notaryNames = (criteria.notaryName as List<X500Name>).map { it.toString() }
|
||||
predicateSet.add(criteriaBuilder.and(vaultStates.get<String>("notaryName").`in`(notaryNames)))
|
||||
}
|
||||
|
||||
// state references
|
||||
criteria.stateRefs?.let {
|
||||
val persistentStateRefs = (criteria.stateRefs as List<StateRef>).map { PersistentStateRef(it.txhash.bytes.toHexString(), it.index) }
|
||||
val compositeKey = vaultStates.get<PersistentStateRef>("stateRef")
|
||||
predicateSet.add(criteriaBuilder.and(compositeKey.`in`(persistentStateRefs)))
|
||||
}
|
||||
|
||||
// time constraints (recorded, consumed)
|
||||
criteria.timeCondition?.let {
|
||||
val timeCondition = criteria.timeCondition
|
||||
val timeInstantType = timeCondition!!.type
|
||||
val timeColumn = when (timeInstantType) {
|
||||
QueryCriteria.TimeInstantType.RECORDED -> Column.Kotlin(VaultSchemaV1.VaultStates::recordedTime)
|
||||
QueryCriteria.TimeInstantType.CONSUMED -> Column.Kotlin(VaultSchemaV1.VaultStates::consumedTime)
|
||||
}
|
||||
val expression = CriteriaExpression.ColumnPredicateExpression(timeColumn, timeCondition.predicate)
|
||||
predicateSet.add(expressionToPredicate(vaultStates, expression))
|
||||
}
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
private fun columnPredicateToPredicate(column: Path<out Any?>, columnPredicate: ColumnPredicate<*>): Predicate {
|
||||
return when (columnPredicate) {
|
||||
is ColumnPredicate.EqualityComparison -> {
|
||||
val literal = columnPredicate.rightLiteral
|
||||
when (columnPredicate.operator) {
|
||||
EqualityComparisonOperator.EQUAL -> criteriaBuilder.equal(column, literal)
|
||||
EqualityComparisonOperator.NOT_EQUAL -> criteriaBuilder.notEqual(column, literal)
|
||||
}
|
||||
}
|
||||
is ColumnPredicate.BinaryComparison -> {
|
||||
column as Path<Comparable<Any?>?>
|
||||
val literal = columnPredicate.rightLiteral as Comparable<Any?>?
|
||||
when (columnPredicate.operator) {
|
||||
BinaryComparisonOperator.GREATER_THAN -> criteriaBuilder.greaterThan(column, literal)
|
||||
BinaryComparisonOperator.GREATER_THAN_OR_EQUAL -> criteriaBuilder.greaterThanOrEqualTo(column, literal)
|
||||
BinaryComparisonOperator.LESS_THAN -> criteriaBuilder.lessThan(column, literal)
|
||||
BinaryComparisonOperator.LESS_THAN_OR_EQUAL -> criteriaBuilder.lessThanOrEqualTo(column, literal)
|
||||
}
|
||||
}
|
||||
is ColumnPredicate.Likeness -> {
|
||||
column as Path<String?>
|
||||
when (columnPredicate.operator) {
|
||||
LikenessOperator.LIKE -> criteriaBuilder.like(column, columnPredicate.rightLiteral)
|
||||
LikenessOperator.NOT_LIKE -> criteriaBuilder.notLike(column, columnPredicate.rightLiteral)
|
||||
}
|
||||
}
|
||||
is ColumnPredicate.CollectionExpression -> {
|
||||
when (columnPredicate.operator) {
|
||||
CollectionOperator.IN -> column.`in`(columnPredicate.rightLiteral)
|
||||
CollectionOperator.NOT_IN -> criteriaBuilder.not(column.`in`(columnPredicate.rightLiteral))
|
||||
}
|
||||
}
|
||||
is ColumnPredicate.Between -> {
|
||||
column as Path<Comparable<Any?>?>
|
||||
val fromLiteral = columnPredicate.rightFromLiteral as Comparable<Any?>?
|
||||
val toLiteral = columnPredicate.rightToLiteral as Comparable<Any?>?
|
||||
criteriaBuilder.between(column, fromLiteral, toLiteral)
|
||||
}
|
||||
is ColumnPredicate.NullExpression -> {
|
||||
when (columnPredicate.operator) {
|
||||
NullOperator.IS_NULL -> criteriaBuilder.isNull(column)
|
||||
NullOperator.NOT_NULL -> criteriaBuilder.isNotNull(column)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return : Expression<Boolean> -> : Predicate
|
||||
*/
|
||||
private fun <O, R> expressionToExpression(root: Root<O>, expression: CriteriaExpression<O, R>): Expression<R> {
|
||||
return when (expression) {
|
||||
is CriteriaExpression.BinaryLogical -> {
|
||||
val leftPredicate = expressionToExpression(root, expression.left)
|
||||
val rightPredicate = expressionToExpression(root, expression.right)
|
||||
when (expression.operator) {
|
||||
BinaryLogicalOperator.AND -> criteriaBuilder.and(leftPredicate, rightPredicate) as Expression<R>
|
||||
BinaryLogicalOperator.OR -> criteriaBuilder.or(leftPredicate, rightPredicate) as Expression<R>
|
||||
}
|
||||
}
|
||||
is CriteriaExpression.Not -> criteriaBuilder.not(expressionToExpression(root, expression.expression)) as Expression<R>
|
||||
is CriteriaExpression.ColumnPredicateExpression<O, *> -> {
|
||||
val column = root.get<Any?>(getColumnName(expression.column))
|
||||
columnPredicateToPredicate(column, expression.predicate) as Expression<R>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <O> expressionToPredicate(root: Root<O>, expression: CriteriaExpression<O, Boolean>): Predicate {
|
||||
return expressionToExpression(root, expression) as Predicate
|
||||
}
|
||||
|
||||
override fun parseCriteria(criteria: QueryCriteria.FungibleAssetQueryCriteria) : Collection<Predicate> {
|
||||
log.trace { "Parsing FungibleAssetQueryCriteria: $criteria" }
|
||||
|
||||
var predicateSet = mutableSetOf<Predicate>()
|
||||
|
||||
val vaultFungibleStates = criteriaQuery.from(VaultSchemaV1.VaultFungibleStates::class.java)
|
||||
rootEntities.putIfAbsent(VaultSchemaV1.VaultFungibleStates::class.java, vaultFungibleStates)
|
||||
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultFungibleStates.get<PersistentStateRef>("stateRef"))
|
||||
predicateSet.add(joinPredicate)
|
||||
|
||||
// owner
|
||||
criteria.owner?.let {
|
||||
val ownerKeys = criteria.owner as List<AbstractParty>
|
||||
val joinFungibleStateToParty = vaultFungibleStates.join<VaultSchemaV1.VaultFungibleStates, CommonSchemaV1.Party>("issuerParty")
|
||||
val owners = ownerKeys.map { it.nameOrNull()?.toString() ?: it.toString()}
|
||||
predicateSet.add(criteriaBuilder.and(joinFungibleStateToParty.get<CommonSchemaV1.Party>("name").`in`(owners)))
|
||||
}
|
||||
|
||||
// quantity
|
||||
criteria.quantity?.let {
|
||||
predicateSet.add(columnPredicateToPredicate(vaultFungibleStates.get<Long>("quantity"), it))
|
||||
}
|
||||
|
||||
// issuer party
|
||||
criteria.issuerPartyName?.let {
|
||||
val issuerParties = criteria.issuerPartyName as List<AbstractParty>
|
||||
val joinFungibleStateToParty = vaultFungibleStates.join<VaultSchemaV1.VaultFungibleStates, CommonSchemaV1.Party>("issuerParty")
|
||||
val dealPartyKeys = issuerParties.map { it.nameOrNull().toString() }
|
||||
predicateSet.add(criteriaBuilder.equal(joinFungibleStateToParty.get<CommonSchemaV1.Party>("name"), dealPartyKeys))
|
||||
}
|
||||
|
||||
// issuer reference
|
||||
criteria.issuerRef?.let {
|
||||
val issuerRefs = (criteria.issuerRef as List<OpaqueBytes>).map { it.bytes }
|
||||
predicateSet.add(criteriaBuilder.and(vaultFungibleStates.get<ByteArray>("issuerRef").`in`(issuerRefs)))
|
||||
}
|
||||
|
||||
// participants
|
||||
criteria.participants?.let {
|
||||
val participants = criteria.participants as List<AbstractParty>
|
||||
val joinFungibleStateToParty = vaultFungibleStates.join<VaultSchemaV1.VaultFungibleStates, CommonSchemaV1.Party>("participants")
|
||||
val participantKeys = participants.map { it.nameOrNull().toString() }
|
||||
predicateSet.add(criteriaBuilder.and(joinFungibleStateToParty.get<CommonSchemaV1.Party>("name").`in`(participantKeys)))
|
||||
criteriaQuery.distinct(true)
|
||||
}
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
override fun parseCriteria(criteria: QueryCriteria.LinearStateQueryCriteria) : Collection<Predicate> {
|
||||
log.trace { "Parsing LinearStateQueryCriteria: $criteria" }
|
||||
|
||||
val predicateSet = mutableSetOf<Predicate>()
|
||||
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
rootEntities.putIfAbsent(VaultSchemaV1.VaultLinearStates::class.java, vaultLinearStates)
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef"))
|
||||
joinPredicates.add(joinPredicate)
|
||||
|
||||
// linear ids
|
||||
criteria.linearId?.let {
|
||||
val uniqueIdentifiers = criteria.linearId as List<UniqueIdentifier>
|
||||
val externalIds = uniqueIdentifiers.mapNotNull { it.externalId }
|
||||
if (externalIds.isNotEmpty())
|
||||
predicateSet.add(criteriaBuilder.and(vaultLinearStates.get<String>("externalId").`in`(externalIds)))
|
||||
predicateSet.add(criteriaBuilder.and(vaultLinearStates.get<UUID>("uuid").`in`(uniqueIdentifiers.map { it.id })))
|
||||
}
|
||||
|
||||
// deal refs
|
||||
criteria.dealRef?.let {
|
||||
val dealRefs = criteria.dealRef as List<String>
|
||||
predicateSet.add(criteriaBuilder.and(vaultLinearStates.get<String>("dealReference").`in`(dealRefs)))
|
||||
}
|
||||
|
||||
// deal participants
|
||||
criteria.participants?.let {
|
||||
val participants = criteria.participants as List<AbstractParty>
|
||||
val joinLinearStateToParty = vaultLinearStates.join<VaultSchemaV1.VaultLinearStates, CommonSchemaV1.Party>("participants")
|
||||
val participantKeys = participants.map { it.nameOrNull().toString() }
|
||||
predicateSet.add(criteriaBuilder.and(joinLinearStateToParty.get<CommonSchemaV1.Party>("name").`in`(participantKeys)))
|
||||
criteriaQuery.distinct(true)
|
||||
}
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
override fun <L : PersistentState> parseCriteria(criteria: QueryCriteria.VaultCustomQueryCriteria<L>): Collection<Predicate> {
|
||||
log.trace { "Parsing VaultCustomQueryCriteria: $criteria" }
|
||||
|
||||
val predicateSet = mutableSetOf<Predicate>()
|
||||
val entityClass = resolveEnclosingObjectFromExpression(criteria.expression)
|
||||
|
||||
try {
|
||||
val entityRoot = criteriaQuery.from(entityClass)
|
||||
rootEntities.putIfAbsent(entityClass, entityRoot)
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), entityRoot.get<PersistentStateRef>("stateRef"))
|
||||
joinPredicates.add(joinPredicate)
|
||||
|
||||
predicateSet.add(expressionToPredicate(entityRoot, criteria.expression))
|
||||
}
|
||||
catch (e: Exception) {
|
||||
e.message?.let { message ->
|
||||
if (message.contains("Not an entity"))
|
||||
throw VaultQueryException("""
|
||||
Please register the entity '${entityClass.name.substringBefore('$')}' class in your CorDapp's CordaPluginRegistry configuration (requiredSchemas attribute)
|
||||
and ensure you have declared (in supportedSchemas()) and mapped (in generateMappedObject()) the schema in the associated contract state's QueryableState interface implementation.
|
||||
See https://docs.corda.net/persistence.html?highlight=persistence for more information""")
|
||||
}
|
||||
throw VaultQueryException("Parsing error: ${e.message}")
|
||||
}
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
override fun parseOr(left: QueryCriteria, right: QueryCriteria): Collection<Predicate> {
|
||||
log.trace { "Parsing OR QueryCriteria composition: $left OR $right" }
|
||||
|
||||
var predicateSet = mutableSetOf<Predicate>()
|
||||
val leftPredicates = parse(left)
|
||||
val rightPredicates = parse(right)
|
||||
|
||||
val orPredicate = criteriaBuilder.or(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())
|
||||
predicateSet.add(orPredicate)
|
||||
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
override fun parseAnd(left: QueryCriteria, right: QueryCriteria): Collection<Predicate> {
|
||||
log.trace { "Parsing AND QueryCriteria composition: $left AND $right" }
|
||||
|
||||
var predicateSet = mutableSetOf<Predicate>()
|
||||
val leftPredicates = parse(left)
|
||||
val rightPredicates = parse(right)
|
||||
|
||||
val andPredicate = criteriaBuilder.and(criteriaBuilder.and(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray()))
|
||||
predicateSet.add(andPredicate)
|
||||
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
override fun parse(criteria: QueryCriteria, sorting: Sort?): Collection<Predicate> {
|
||||
val predicateSet = criteria.visit(this)
|
||||
|
||||
sorting?.let {
|
||||
if (sorting.columns.isNotEmpty())
|
||||
parse(sorting)
|
||||
}
|
||||
|
||||
val selections = listOf(vaultStates).plus(rootEntities.map { it.value })
|
||||
criteriaQuery.multiselect(selections)
|
||||
val combinedPredicates = joinPredicates.plus(predicateSet)
|
||||
criteriaQuery.where(*combinedPredicates.toTypedArray())
|
||||
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
private fun parse(sorting: Sort) {
|
||||
log.trace { "Parsing sorting specification: $sorting" }
|
||||
|
||||
var orderCriteria = mutableListOf<Order>()
|
||||
|
||||
sorting.columns.map { (sortAttribute, direction) ->
|
||||
val (entityStateClass, entityStateColumnName) =
|
||||
when(sortAttribute) {
|
||||
is SortAttribute.Standard -> parse(sortAttribute.attribute)
|
||||
is SortAttribute.Custom -> Pair(sortAttribute.entityStateClass, sortAttribute.entityStateColumnName)
|
||||
}
|
||||
val sortEntityRoot =
|
||||
rootEntities.getOrElse(entityStateClass) {
|
||||
// scenario where sorting on attributes not parsed as criteria
|
||||
val entityRoot = criteriaQuery.from(entityStateClass)
|
||||
rootEntities.put(entityStateClass, entityRoot)
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), entityRoot.get<PersistentStateRef>("stateRef"))
|
||||
joinPredicates.add(joinPredicate)
|
||||
entityRoot
|
||||
}
|
||||
when (direction) {
|
||||
Sort.Direction.ASC -> {
|
||||
orderCriteria.add(criteriaBuilder.asc(sortEntityRoot.get<String>(entityStateColumnName)))
|
||||
}
|
||||
Sort.Direction.DESC ->
|
||||
orderCriteria.add(criteriaBuilder.desc(sortEntityRoot.get<String>(entityStateColumnName)))
|
||||
}
|
||||
}
|
||||
if (orderCriteria.isNotEmpty()) {
|
||||
criteriaQuery.orderBy(orderCriteria)
|
||||
criteriaQuery.where(*joinPredicates.toTypedArray())
|
||||
}
|
||||
}
|
||||
|
||||
private fun parse(sortAttribute: Sort.Attribute): Pair<Class<out PersistentState>, String> {
|
||||
val entityClassAndColumnName : Pair<Class<out PersistentState>, String> =
|
||||
when(sortAttribute) {
|
||||
is Sort.VaultStateAttribute -> {
|
||||
Pair(VaultSchemaV1.VaultStates::class.java, sortAttribute.columnName)
|
||||
}
|
||||
is Sort.LinearStateAttribute -> {
|
||||
Pair(VaultSchemaV1.VaultLinearStates::class.java, sortAttribute.columnName)
|
||||
}
|
||||
is Sort.FungibleStateAttribute -> {
|
||||
Pair(VaultSchemaV1.VaultFungibleStates::class.java, sortAttribute.columnName)
|
||||
}
|
||||
else -> throw VaultQueryException("Invalid sort attribute: $sortAttribute")
|
||||
}
|
||||
return entityClassAndColumnName
|
||||
}
|
||||
}
|
@ -0,0 +1,148 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.VaultQueryService
|
||||
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.storageKryo
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Exception
|
||||
import javax.persistence.EntityManager
|
||||
import javax.persistence.Tuple
|
||||
|
||||
|
||||
class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
val updatesPublisher: PublishSubject<Vault.Update>) : SingletonSerializeAsToken(), VaultQueryService {
|
||||
|
||||
companion object {
|
||||
val log = loggerFor<HibernateVaultQueryImpl>()
|
||||
}
|
||||
|
||||
private val sessionFactory = hibernateConfig.sessionFactoryForRegisteredSchemas()
|
||||
private val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
|
||||
@Throws(VaultQueryException::class)
|
||||
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out ContractState>): Vault.Page<T> {
|
||||
log.info("Vault Query for contract type: $contractType, criteria: $criteria, pagination: $paging, sorting: $sorting")
|
||||
|
||||
val session = sessionFactory.withOptions().
|
||||
connection(TransactionManager.current().connection).
|
||||
openSession()
|
||||
|
||||
session.use {
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
||||
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
val contractTypeMappings = resolveUniqueContractStateTypes(session)
|
||||
// TODO: revisit (use single instance of parser for all queries)
|
||||
val criteriaParser = HibernateQueryCriteriaParser(contractType, contractTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
|
||||
|
||||
try {
|
||||
// parse criteria and build where predicates
|
||||
criteriaParser.parse(criteria, sorting)
|
||||
|
||||
// prepare query for execution
|
||||
val query = session.createQuery(criteriaQuery)
|
||||
|
||||
// pagination
|
||||
if (paging.pageNumber < 0) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from 0]")
|
||||
if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]")
|
||||
|
||||
// count total results available
|
||||
val countQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
countQuery.select(criteriaBuilder.count(countQuery.from(VaultSchemaV1.VaultStates::class.java)))
|
||||
val totalStates = session.createQuery(countQuery).singleResult.toInt()
|
||||
|
||||
if ((paging.pageNumber != 0) && (paging.pageSize * paging.pageNumber >= totalStates))
|
||||
throw VaultQueryException("Requested more results than available [${paging.pageSize} * ${paging.pageNumber} >= ${totalStates}]")
|
||||
|
||||
query.firstResult = paging.pageNumber * paging.pageSize
|
||||
query.maxResults = paging.pageSize
|
||||
|
||||
// execution
|
||||
val results = query.resultList
|
||||
val statesAndRefs: MutableList<StateAndRef<*>> = mutableListOf()
|
||||
val statesMeta: MutableList<Vault.StateMetadata> = mutableListOf()
|
||||
|
||||
results.asSequence()
|
||||
.forEach { it ->
|
||||
val it = it[0] as VaultSchemaV1.VaultStates
|
||||
val stateRef = StateRef(SecureHash.parse(it.stateRef!!.txId!!), it.stateRef!!.index!!)
|
||||
val state = it.contractState.deserialize<TransactionState<T>>(storageKryo())
|
||||
statesMeta.add(Vault.StateMetadata(stateRef, it.contractStateClassName, it.recordedTime, it.consumedTime, it.stateStatus, it.notaryName, it.notaryKey, it.lockId, it.lockUpdateTime))
|
||||
statesAndRefs.add(StateAndRef(state, stateRef))
|
||||
}
|
||||
|
||||
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, pageable = paging, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates) as Vault.Page<T>
|
||||
|
||||
} catch (e: Exception) {
|
||||
log.error(e.message)
|
||||
throw e.cause ?: e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox ({ updatesPublisher })
|
||||
|
||||
@Throws(VaultQueryException::class)
|
||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out ContractState>): Vault.PageAndUpdates<T> {
|
||||
return mutex.locked {
|
||||
val snapshotResults = _queryBy<T>(criteria, paging, sorting, contractType)
|
||||
Vault.PageAndUpdates(snapshotResults,
|
||||
updatesPublisher.bufferUntilSubscribed()
|
||||
.filter { it.containsType(contractType, snapshotResults.stateTypes) } )
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain a list of contract state interfaces to concrete types stored in the vault
|
||||
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
|
||||
*/
|
||||
fun resolveUniqueContractStateTypes(session: EntityManager) : Map<String, List<String>> {
|
||||
val criteria = criteriaBuilder.createQuery(String::class.java)
|
||||
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
|
||||
val query = session.createQuery(criteria)
|
||||
val results = query.resultList
|
||||
val distinctTypes = results.map { it }
|
||||
|
||||
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableList<String>>()
|
||||
distinctTypes.forEach { it ->
|
||||
val concreteType = Class.forName(it) as Class<ContractState>
|
||||
val contractInterfaces = deriveContractInterfaces(concreteType)
|
||||
contractInterfaces.map {
|
||||
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableListOf() })
|
||||
contractInterface.add(concreteType.name)
|
||||
}
|
||||
}
|
||||
return contractInterfaceToConcreteTypes
|
||||
}
|
||||
|
||||
private fun <T: ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
|
||||
val myInterfaces: MutableSet<Class<T>> = mutableSetOf()
|
||||
clazz.interfaces.forEach {
|
||||
if (!it.equals(ContractState::class.java)) {
|
||||
myInterfaces.add(it as Class<T>)
|
||||
myInterfaces.addAll(deriveContractInterfaces(it))
|
||||
}
|
||||
}
|
||||
return myInterfaces
|
||||
}
|
||||
}
|
@ -20,13 +20,7 @@ import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.StatesNotAvailableException
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.tee
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
@ -35,7 +29,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.vault.schemas.*
|
||||
import net.corda.node.services.vault.schemas.requery.*
|
||||
import net.corda.node.utilities.bufferUntilDatabaseCommit
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import rx.Observable
|
||||
@ -173,6 +167,9 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
override val updates: Observable<Vault.Update>
|
||||
get() = mutex.locked { _updatesInDbTx }
|
||||
|
||||
override val updatesPublisher: PublishSubject<Vault.Update>
|
||||
get() = mutex.locked { _updatesPublisher }
|
||||
|
||||
override fun track(): Pair<Vault<ContractState>, Observable<Vault.Update>> {
|
||||
return mutex.locked {
|
||||
Pair(Vault(unconsumedStates<ContractState>()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
@ -222,26 +219,6 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
return stateAndRefs.associateBy({ it.ref }, { it.state })
|
||||
}
|
||||
|
||||
override fun <T : ContractState> queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): Vault.Page<T> {
|
||||
|
||||
TODO("Under construction")
|
||||
|
||||
// If [VaultQueryCriteria.PageSpecification] specified
|
||||
// must return (CloseableIterator) result.get().iterator(skip, take)
|
||||
// where
|
||||
// skip = Max[(pageNumber - 1),0] * pageSize
|
||||
// take = pageSize
|
||||
}
|
||||
|
||||
override fun <T : ContractState> trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): Vault.PageAndUpdates<T> {
|
||||
TODO("Under construction")
|
||||
|
||||
// return mutex.locked {
|
||||
// Vault.PageAndUpdates(queryBy(criteria),
|
||||
// _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
// }
|
||||
}
|
||||
|
||||
override fun notifyAll(txns: Iterable<WireTransaction>) {
|
||||
val ourKeys = services.keyManagementService.keys
|
||||
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) }
|
||||
|
@ -0,0 +1,134 @@
|
||||
package net.corda.node.services.vault.schemas.jpa
|
||||
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
|
||||
/**
|
||||
* JPA representation of the core Vault Schema
|
||||
*/
|
||||
object VaultSchema
|
||||
|
||||
/**
|
||||
* First version of the Vault ORM schema
|
||||
*/
|
||||
object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, version = 1,
|
||||
mappedTypes = listOf(VaultStates::class.java, VaultLinearStates::class.java, VaultFungibleStates::class.java, CommonSchemaV1.Party::class.java)) {
|
||||
@Entity
|
||||
@Table(name = "vault_states",
|
||||
indexes = arrayOf(Index(name = "state_status_idx", columnList = "state_status")))
|
||||
class VaultStates(
|
||||
/** refers to the notary a state is attached to */
|
||||
@Column(name = "notary_name")
|
||||
var notaryName: String,
|
||||
|
||||
@Column(name = "notary_key", length = 65535) // TODO What is the upper limit on size of CompositeKey?
|
||||
var notaryKey: String,
|
||||
|
||||
/** references a concrete ContractState that is [QueryableState] and has a [MappedSchema] */
|
||||
@Column(name = "contract_state_class_name")
|
||||
var contractStateClassName: String,
|
||||
|
||||
/** refers to serialized transaction Contract State */
|
||||
// TODO: define contract state size maximum size and adjust length accordingly
|
||||
@Column(name = "contract_state", length = 100000)
|
||||
var contractState: ByteArray,
|
||||
|
||||
/** state lifecycle: unconsumed, consumed */
|
||||
@Column(name = "state_status")
|
||||
var stateStatus: Vault.StateStatus,
|
||||
|
||||
/** refers to timestamp recorded upon entering UNCONSUMED state */
|
||||
@Column(name = "recorded_timestamp")
|
||||
var recordedTime: Instant,
|
||||
|
||||
/** refers to timestamp recorded upon entering CONSUMED state */
|
||||
@Column(name = "consumed_timestamp", nullable = true)
|
||||
var consumedTime: Instant?,
|
||||
|
||||
/** used to denote a state has been soft locked (to prevent double spend)
|
||||
* will contain a temporary unique [UUID] obtained from a flow session */
|
||||
@Column(name = "lock_id", nullable = true)
|
||||
var lockId: String,
|
||||
|
||||
/** refers to the last time a lock was taken (reserved) or updated (released, re-reserved) */
|
||||
@Column(name = "lock_timestamp", nullable = true)
|
||||
var lockUpdateTime: Instant?
|
||||
) : PersistentState()
|
||||
|
||||
@Entity
|
||||
@Table(name = "vault_linear_states",
|
||||
indexes = arrayOf(Index(name = "external_id_index", columnList = "external_id"),
|
||||
Index(name = "uuid_index", columnList = "uuid"),
|
||||
Index(name = "deal_reference_index", columnList = "deal_reference")))
|
||||
class VaultLinearStates(
|
||||
/** [ContractState] attributes */
|
||||
@OneToMany(cascade = arrayOf(CascadeType.ALL))
|
||||
var participants: Set<CommonSchemaV1.Party>,
|
||||
|
||||
/**
|
||||
* Represents a [LinearState] [UniqueIdentifier]
|
||||
*/
|
||||
@Column(name = "external_id")
|
||||
var externalId: String?,
|
||||
|
||||
@Column(name = "uuid", nullable = false)
|
||||
var uuid: UUID,
|
||||
|
||||
// TODO: DealState to be deprecated (collapsed into LinearState)
|
||||
|
||||
/** Deal State attributes **/
|
||||
@Column(name = "deal_reference")
|
||||
var dealReference: String
|
||||
) : PersistentState() {
|
||||
constructor(uid: UniqueIdentifier, _dealReference: String, _participants: List<AbstractParty>) :
|
||||
this(externalId = uid.externalId,
|
||||
uuid = uid.id,
|
||||
dealReference = _dealReference,
|
||||
participants = _participants.map{ CommonSchemaV1.Party(it) }.toSet() )
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "vault_fungible_states")
|
||||
class VaultFungibleStates(
|
||||
/** [ContractState] attributes */
|
||||
@OneToMany(cascade = arrayOf(CascadeType.ALL))
|
||||
var participants: Set<CommonSchemaV1.Party>,
|
||||
|
||||
/** [OwnableState] attributes */
|
||||
@OneToOne(cascade = arrayOf(CascadeType.ALL))
|
||||
var owner: CommonSchemaV1.Party,
|
||||
|
||||
/** [FungibleAsset] attributes
|
||||
*
|
||||
* Note: the underlying Product being issued must be modelled into the
|
||||
* custom contract itself (eg. see currency in Cash contract state)
|
||||
*/
|
||||
|
||||
/** Amount attributes */
|
||||
@Column(name = "quantity")
|
||||
var quantity: Long,
|
||||
|
||||
/** Issuer attributes */
|
||||
@OneToOne(cascade = arrayOf(CascadeType.ALL))
|
||||
var issuerParty: CommonSchemaV1.Party,
|
||||
|
||||
@Column(name = "issuer_reference")
|
||||
var issuerRef: ByteArray
|
||||
) : PersistentState() {
|
||||
constructor(_owner: AbstractParty, _quantity: Long, _issuerParty: AbstractParty, _issuerRef: OpaqueBytes, _participants: List<AbstractParty>) :
|
||||
this(owner = CommonSchemaV1.Party(_owner),
|
||||
quantity = _quantity,
|
||||
issuerParty = CommonSchemaV1.Party(_issuerParty),
|
||||
issuerRef = _issuerRef.bytes,
|
||||
participants = _participants.map { CommonSchemaV1.Party(it) }.toSet())
|
||||
}
|
||||
}
|
@ -1,86 +1,88 @@
|
||||
package net.corda.node.services.vault;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import kotlin.Pair;
|
||||
import net.corda.contracts.DealState;
|
||||
import net.corda.contracts.asset.Cash;
|
||||
import com.google.common.collect.*;
|
||||
import kotlin.*;
|
||||
import net.corda.contracts.*;
|
||||
import net.corda.contracts.asset.*;
|
||||
import net.corda.core.contracts.*;
|
||||
import net.corda.core.crypto.SecureHash;
|
||||
import net.corda.core.node.services.Vault;
|
||||
import net.corda.core.node.services.VaultService;
|
||||
import net.corda.core.node.services.vault.PageSpecification;
|
||||
import net.corda.core.node.services.vault.QueryCriteria;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.LinearStateQueryCriteria;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria;
|
||||
import net.corda.core.node.services.vault.Sort;
|
||||
import net.corda.core.serialization.OpaqueBytes;
|
||||
import net.corda.core.transactions.SignedTransaction;
|
||||
import net.corda.core.transactions.WireTransaction;
|
||||
import net.corda.node.services.vault.schemas.VaultLinearStateEntity;
|
||||
import net.corda.testing.node.MockServices;
|
||||
import org.bouncycastle.asn1.x500.X500Name;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.exposed.sql.Database;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import net.corda.core.crypto.*;
|
||||
import net.corda.core.identity.*;
|
||||
import net.corda.core.node.services.*;
|
||||
import net.corda.core.node.services.vault.*;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.*;
|
||||
import net.corda.core.schemas.*;
|
||||
import net.corda.core.serialization.*;
|
||||
import net.corda.core.transactions.*;
|
||||
import net.corda.node.services.database.*;
|
||||
import net.corda.node.services.schema.*;
|
||||
import net.corda.schemas.*;
|
||||
import net.corda.testing.node.*;
|
||||
import org.jetbrains.annotations.*;
|
||||
import org.jetbrains.exposed.sql.*;
|
||||
import org.junit.*;
|
||||
import rx.Observable;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.*;
|
||||
import java.lang.reflect.*;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
import java.util.stream.*;
|
||||
|
||||
import static net.corda.contracts.asset.CashKt.getDUMMY_CASH_ISSUER;
|
||||
import static net.corda.contracts.asset.CashKt.getDUMMY_CASH_ISSUER_KEY;
|
||||
import static net.corda.contracts.asset.CashKt.*;
|
||||
import static net.corda.contracts.testing.VaultFiller.*;
|
||||
import static net.corda.core.node.services.vault.QueryCriteriaKt.and;
|
||||
import static net.corda.core.node.services.vault.QueryCriteriaUtilsKt.getMAX_PAGE_SIZE;
|
||||
import static net.corda.core.utilities.TestConstants.getDUMMY_NOTARY;
|
||||
import static net.corda.node.utilities.DatabaseSupportKt.configureDatabase;
|
||||
import static net.corda.core.node.services.vault.QueryCriteriaKt.*;
|
||||
import static net.corda.core.node.services.vault.QueryCriteriaUtilsKt.*;
|
||||
import static net.corda.core.utilities.TestConstants.*;
|
||||
import static net.corda.node.utilities.DatabaseSupportKt.*;
|
||||
import static net.corda.node.utilities.DatabaseSupportKt.transaction;
|
||||
import static net.corda.testing.CoreTestUtils.getMEGA_CORP;
|
||||
import static net.corda.testing.node.MockServicesKt.makeTestDataSourceProperties;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static net.corda.testing.CoreTestUtils.*;
|
||||
import static net.corda.testing.node.MockServicesKt.*;
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
@Ignore
|
||||
public class VaultQueryJavaTests {
|
||||
|
||||
private MockServices services;
|
||||
private VaultService vaultSvc;
|
||||
VaultService vaultSvc;
|
||||
private VaultQueryService vaultQuerySvc;
|
||||
private Closeable dataSource;
|
||||
private Database database;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
Properties dataSourceProps = makeTestDataSourceProperties(SecureHash.randomSHA256().toString());
|
||||
Pair<Closeable, Database> dataSourceAndDatabase = configureDatabase(dataSourceProps);
|
||||
dataSource = dataSourceAndDatabase.getFirst();
|
||||
database = dataSourceAndDatabase.getSecond();
|
||||
|
||||
transaction(database, statement -> services = new MockServices() {
|
||||
@NotNull
|
||||
@Override
|
||||
public VaultService getVaultService() {
|
||||
return makeVaultService(dataSourceProps);
|
||||
}
|
||||
Set<MappedSchema> customSchemas = new HashSet<>(Arrays.asList(DummyLinearStateSchemaV1.INSTANCE));
|
||||
HibernateConfiguration hibernateConfig = new HibernateConfiguration(new NodeSchemaService(customSchemas));
|
||||
transaction(database,
|
||||
statement -> { services = new MockServices(getMEGA_CORP_KEY()) {
|
||||
@NotNull
|
||||
@Override
|
||||
public VaultService getVaultService() {
|
||||
return makeVaultService(dataSourceProps, hibernateConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordTransactions(@NotNull Iterable<SignedTransaction> txs) {
|
||||
for (SignedTransaction stx : txs ) {
|
||||
getStorageService().getValidatedTransactions().addTransaction(stx);
|
||||
}
|
||||
@Override
|
||||
public VaultQueryService getVaultQueryService() {
|
||||
return new HibernateVaultQueryImpl(hibernateConfig, getVaultService().getUpdatesPublisher());
|
||||
}
|
||||
|
||||
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(txn -> txn.getTx());
|
||||
getVaultService().notifyAll(wtxn.collect(Collectors.toList()));
|
||||
}
|
||||
@Override
|
||||
public void recordTransactions(@NotNull Iterable<SignedTransaction> txs) {
|
||||
for (SignedTransaction stx : txs) {
|
||||
getStorageService().getValidatedTransactions().addTransaction(stx);
|
||||
}
|
||||
|
||||
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(txn -> txn.getTx());
|
||||
getVaultService().notifyAll(wtxn.collect(Collectors.toList()));
|
||||
}
|
||||
};
|
||||
vaultSvc = services.getVaultService();
|
||||
vaultQuerySvc = services.getVaultQueryService();
|
||||
|
||||
return services;
|
||||
});
|
||||
|
||||
vaultSvc = services.getVaultService();
|
||||
}
|
||||
|
||||
@After
|
||||
@ -97,8 +99,27 @@ public class VaultQueryJavaTests {
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void consumedStates() {
|
||||
public void unconsumedLinearStates() throws VaultQueryException {
|
||||
transaction(database, tx -> {
|
||||
|
||||
fillWithSomeTestLinearStates(services, 3);
|
||||
|
||||
// DOCSTART VaultJavaQueryExample0
|
||||
Vault.Page<LinearState> results = vaultQuerySvc.queryBy(LinearState.class);
|
||||
// DOCEND VaultJavaQueryExample0
|
||||
|
||||
assertThat(results.getStates()).hasSize(3);
|
||||
|
||||
return tx;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumedCashStates() {
|
||||
transaction(database, tx -> {
|
||||
|
||||
Amount<Currency> amount = new Amount<>(100, Currency.getInstance("USD"));
|
||||
|
||||
fillWithSomeTestCash(services,
|
||||
new Amount<>(100, Currency.getInstance("USD")),
|
||||
getDUMMY_NOTARY(),
|
||||
@ -110,13 +131,11 @@ public class VaultQueryJavaTests {
|
||||
getDUMMY_CASH_ISSUER(),
|
||||
getDUMMY_CASH_ISSUER_KEY() );
|
||||
|
||||
// DOCSTART VaultJavaQueryExample1
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
Vault.StateStatus status = Vault.StateStatus.CONSUMED;
|
||||
consumeCash(services, amount);
|
||||
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(status, null, contractStateTypes);
|
||||
Vault.Page<ContractState> results = vaultSvc.queryBy(criteria);
|
||||
// DOCSTART VaultJavaQueryExample1
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.CONSUMED);
|
||||
Vault.Page<Cash.State> results = vaultQuerySvc.queryBy(Cash.State.class, criteria);
|
||||
// DOCEND VaultJavaQueryExample1
|
||||
|
||||
assertThat(results.getStates()).hasSize(3);
|
||||
@ -126,32 +145,38 @@ public class VaultQueryJavaTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumedDealStatesPagedSorted() {
|
||||
public void consumedDealStatesPagedSorted() throws VaultQueryException {
|
||||
transaction(database, tx -> {
|
||||
|
||||
UniqueIdentifier uid = new UniqueIdentifier();
|
||||
fillWithSomeTestLinearStates(services, 10, uid);
|
||||
Vault<LinearState> states = fillWithSomeTestLinearStates(services, 10, null);
|
||||
StateAndRef<LinearState> linearState = states.getStates().iterator().next();
|
||||
UniqueIdentifier uid = linearState.component1().getData().getLinearId();
|
||||
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
fillWithSomeTestDeals(services, dealIds);
|
||||
Vault<DealState> dealStates = fillWithSomeTestDeals(services, dealIds);
|
||||
|
||||
// consume states
|
||||
consumeDeals(services, (List<? extends StateAndRef<? extends DealState>>) dealStates.getStates());
|
||||
consumeLinearStates(services, Arrays.asList(linearState));
|
||||
|
||||
// DOCSTART VaultJavaQueryExample2
|
||||
Vault.StateStatus status = Vault.StateStatus.CONSUMED;
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
Set<Class<LinearState>> contractStateTypes = new HashSet(Collections.singletonList(LinearState.class));
|
||||
|
||||
QueryCriteria vaultCriteria = new VaultQueryCriteria(status, null, contractStateTypes);
|
||||
QueryCriteria vaultCriteria = new VaultQueryCriteria(status, contractStateTypes);
|
||||
|
||||
List<UniqueIdentifier> linearIds = Arrays.asList(uid);
|
||||
List<X500Name> dealPartyNames = Arrays.asList(getMEGA_CORP().getName());
|
||||
QueryCriteria dealCriteriaAll = new LinearStateQueryCriteria(linearIds, false, dealIds, dealPartyNames);
|
||||
QueryCriteria linearCriteriaAll = new LinearStateQueryCriteria(null, linearIds);
|
||||
QueryCriteria dealCriteriaAll = new LinearStateQueryCriteria(null, null, dealIds);
|
||||
|
||||
QueryCriteria compositeCriteria = and(dealCriteriaAll, vaultCriteria);
|
||||
QueryCriteria compositeCriteria1 = or(dealCriteriaAll, linearCriteriaAll);
|
||||
QueryCriteria compositeCriteria2 = and(vaultCriteria, compositeCriteria1);
|
||||
|
||||
PageSpecification pageSpec = new PageSpecification(0, getMAX_PAGE_SIZE());
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(VaultLinearStateEntity.UUID.getName(), Sort.Direction.DESC, Sort.NullHandling.NULLS_LAST);
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
|
||||
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
|
||||
Vault.Page<ContractState> results = vaultSvc.queryBy(compositeCriteria, pageSpec, sorting);
|
||||
Vault.Page<LinearState> results = vaultQuerySvc.queryBy(LinearState.class, compositeCriteria2, pageSpec, sorting);
|
||||
// DOCEND VaultJavaQueryExample2
|
||||
|
||||
assertThat(results.getStates()).hasSize(4);
|
||||
@ -160,13 +185,52 @@ public class VaultQueryJavaTests {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customQueryForCashStatesWithAmountOfCurrencyGreaterOrEqualThanQuantity() {
|
||||
transaction(database, tx -> {
|
||||
|
||||
Amount<Currency> pounds = new Amount<>(100, Currency.getInstance("GBP"));
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars10 = new Amount<>(10, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars1 = new Amount<>(1, Currency.getInstance("USD"));
|
||||
|
||||
fillWithSomeTestCash(services, pounds, getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER(), getDUMMY_CASH_ISSUER_KEY());
|
||||
fillWithSomeTestCash(services, dollars100, getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER(), getDUMMY_CASH_ISSUER_KEY());
|
||||
fillWithSomeTestCash(services, dollars10, getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER(), getDUMMY_CASH_ISSUER_KEY());
|
||||
fillWithSomeTestCash(services, dollars1, getDUMMY_NOTARY(), 1, 1, new Random(0L), new OpaqueBytes("1".getBytes()), null, getDUMMY_CASH_ISSUER(), getDUMMY_CASH_ISSUER_KEY());
|
||||
|
||||
try {
|
||||
// DOCSTART VaultJavaQueryExample3
|
||||
QueryCriteria generalCriteria = new VaultQueryCriteria(Vault.StateStatus.ALL);
|
||||
|
||||
Field attributeCurrency = CashSchemaV1.PersistentCashState.class.getDeclaredField("currency");
|
||||
Field attributeQuantity = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");
|
||||
|
||||
CriteriaExpression currencyIndex = Builder.INSTANCE.equal(attributeCurrency, "USD");
|
||||
CriteriaExpression quantityIndex = Builder.INSTANCE.greaterThanOrEqual(attributeQuantity, 10L);
|
||||
|
||||
QueryCriteria customCriteria2 = new VaultCustomQueryCriteria(quantityIndex);
|
||||
QueryCriteria customCriteria1 = new VaultCustomQueryCriteria(currencyIndex);
|
||||
|
||||
|
||||
QueryCriteria criteria = QueryCriteriaKt.and(QueryCriteriaKt.and(generalCriteria, customCriteria1), customCriteria2);
|
||||
Vault.Page<ContractState> results = vaultQuerySvc.queryBy(Cash.State.class, criteria);
|
||||
// DOCEND VaultJavaQueryExample3
|
||||
|
||||
assertThat(results.getStates()).hasSize(2);
|
||||
} catch (NoSuchFieldException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return tx;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamic trackBy() tests
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void trackCashStates() {
|
||||
|
||||
transaction(database, tx -> {
|
||||
fillWithSomeTestCash(services,
|
||||
new Amount<>(100, Currency.getInstance("USD")),
|
||||
@ -179,17 +243,17 @@ public class VaultQueryJavaTests {
|
||||
getDUMMY_CASH_ISSUER(),
|
||||
getDUMMY_CASH_ISSUER_KEY() );
|
||||
|
||||
// DOCSTART VaultJavaQueryExample1
|
||||
// DOCSTART VaultJavaQueryExample4
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, null, contractStateTypes);
|
||||
Vault.PageAndUpdates<ContractState> results = vaultSvc.trackBy(criteria);
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, contractStateTypes);
|
||||
Vault.PageAndUpdates<ContractState> results = vaultQuerySvc.trackBy(ContractState.class, criteria);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getCurrent();
|
||||
Observable<Vault.Update> updates = results.getFuture();
|
||||
|
||||
// DOCEND VaultJavaQueryExample1
|
||||
// DOCEND VaultJavaQueryExample4
|
||||
assertThat(snapshot.getStates()).hasSize(3);
|
||||
|
||||
return tx;
|
||||
@ -200,31 +264,36 @@ public class VaultQueryJavaTests {
|
||||
public void trackDealStatesPagedSorted() {
|
||||
transaction(database, tx -> {
|
||||
|
||||
UniqueIdentifier uid = new UniqueIdentifier();
|
||||
fillWithSomeTestLinearStates(services, 10, uid);
|
||||
Vault<LinearState> states = fillWithSomeTestLinearStates(services, 10, null);
|
||||
UniqueIdentifier uid = states.getStates().iterator().next().component1().getData().getLinearId();
|
||||
|
||||
List<String> dealIds = Arrays.asList("123", "456", "789");
|
||||
fillWithSomeTestDeals(services, dealIds);
|
||||
|
||||
// DOCSTART VaultJavaQueryExample2
|
||||
// DOCSTART VaultJavaQueryExample5
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(DealState.class));
|
||||
QueryCriteria vaultCriteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, null, contractStateTypes);
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Arrays.asList(DealState.class, LinearState.class));
|
||||
QueryCriteria vaultCriteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, contractStateTypes);
|
||||
|
||||
List<UniqueIdentifier> linearIds = Arrays.asList(uid);
|
||||
List<X500Name> dealPartyNames = Arrays.asList(getMEGA_CORP().getName());
|
||||
QueryCriteria dealCriteriaAll = new LinearStateQueryCriteria(linearIds, false, dealIds, dealPartyNames);
|
||||
List<AbstractParty> dealParty = Arrays.asList(getMEGA_CORP());
|
||||
QueryCriteria dealCriteria = new LinearStateQueryCriteria(dealParty, null, dealIds);
|
||||
|
||||
QueryCriteria compositeCriteria = and(dealCriteriaAll, vaultCriteria);
|
||||
QueryCriteria linearCriteria = new LinearStateQueryCriteria(dealParty, linearIds, null);
|
||||
|
||||
|
||||
QueryCriteria dealOrLinearIdCriteria = or(dealCriteria, linearCriteria);
|
||||
|
||||
QueryCriteria compositeCriteria = and(dealOrLinearIdCriteria, vaultCriteria);
|
||||
|
||||
PageSpecification pageSpec = new PageSpecification(0, getMAX_PAGE_SIZE());
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(VaultLinearStateEntity.UUID.getName(), Sort.Direction.DESC, Sort.NullHandling.NULLS_LAST);
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
|
||||
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
|
||||
Vault.PageAndUpdates<ContractState> results = vaultSvc.trackBy(compositeCriteria, pageSpec, sorting);
|
||||
Vault.PageAndUpdates<ContractState> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getCurrent();
|
||||
Observable<Vault.Update> updates = results.getFuture();
|
||||
// DOCEND VaultJavaQueryExample2
|
||||
// DOCEND VaultJavaQueryExample5
|
||||
|
||||
assertThat(snapshot.getStates()).hasSize(4);
|
||||
|
||||
@ -239,6 +308,7 @@ public class VaultQueryJavaTests {
|
||||
@Test
|
||||
public void consumedStatesDeprecated() {
|
||||
transaction(database, tx -> {
|
||||
Amount<Currency> amount = new Amount<>(100, Currency.getInstance("USD"));
|
||||
fillWithSomeTestCash(services,
|
||||
new Amount<>(100, Currency.getInstance("USD")),
|
||||
getDUMMY_NOTARY(),
|
||||
@ -250,6 +320,8 @@ public class VaultQueryJavaTests {
|
||||
getDUMMY_CASH_ISSUER(),
|
||||
getDUMMY_CASH_ISSUER_KEY() );
|
||||
|
||||
consumeCash(services, amount);
|
||||
|
||||
// DOCSTART VaultDeprecatedJavaQueryExample1
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
@ -269,24 +341,21 @@ public class VaultQueryJavaTests {
|
||||
public void consumedStatesForLinearIdDeprecated() {
|
||||
transaction(database, tx -> {
|
||||
|
||||
UniqueIdentifier trackUid = new UniqueIdentifier();
|
||||
fillWithSomeTestLinearStates(services, 1, trackUid);
|
||||
fillWithSomeTestLinearStates(services, 4, new UniqueIdentifier());
|
||||
Vault<LinearState> linearStates = fillWithSomeTestLinearStates(services, 4,null);
|
||||
UniqueIdentifier trackUid = linearStates.getStates().iterator().next().component1().getData().getLinearId();
|
||||
|
||||
// DOCSTART VaultDeprecatedJavaQueryExample2
|
||||
consumeLinearStates(services, (List<? extends StateAndRef<? extends LinearState>>) linearStates.getStates());
|
||||
|
||||
// DOCSTART VaultDeprecatedJavaQueryExample0
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(LinearState.class));
|
||||
Set<Class<LinearState>> contractStateTypes = new HashSet(Collections.singletonList(DummyLinearContract.State.class));
|
||||
EnumSet<Vault.StateStatus> status = EnumSet.of(Vault.StateStatus.CONSUMED);
|
||||
|
||||
// WARNING! unfortunately cannot use inlined reified Kotlin extension methods.
|
||||
Iterable<StateAndRef<ContractState>> results = vaultSvc.states(contractStateTypes, status, true);
|
||||
|
||||
Stream<StateAndRef<ContractState>> trackedLinearState = StreamSupport.stream(results.spliterator(), false).filter(
|
||||
state -> ((LinearState) state.component1().getData()).getLinearId() == trackUid);
|
||||
// DOCEND VaultDeprecatedJavaQueryExample2
|
||||
Iterable<StateAndRef<LinearState>> results = vaultSvc.states(contractStateTypes, status, true);
|
||||
// DOCEND VaultDeprecatedJavaQueryExample0
|
||||
|
||||
assertThat(results).hasSize(4);
|
||||
assertThat(trackedLinearState).hasSize(1);
|
||||
|
||||
return tx;
|
||||
});
|
||||
|
@ -53,7 +53,8 @@ class CordaRPCOpsImplTest {
|
||||
lateinit var rpc: CordaRPCOpsImpl
|
||||
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
|
||||
lateinit var transactions: Observable<SignedTransaction>
|
||||
lateinit var vaultUpdates: Observable<Vault.Update>
|
||||
lateinit var vaultUpdates: Observable<Vault.Update> // TODO: deprecated
|
||||
lateinit var vaultTrackCash: Observable<Vault.Update>
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
@ -71,6 +72,7 @@ class CordaRPCOpsImplTest {
|
||||
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
|
||||
transactions = rpc.verifiedTransactions().second
|
||||
vaultUpdates = rpc.vaultAndUpdates().second
|
||||
vaultTrackCash = rpc.vaultTrackBy<Cash.State>().future
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,12 +114,20 @@ class CordaRPCOpsImplTest {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: deprecated
|
||||
vaultUpdates.expectEvents {
|
||||
expect { update ->
|
||||
val actual = update.produced.single().state.data
|
||||
assertEquals(expectedState, actual)
|
||||
}
|
||||
}
|
||||
|
||||
vaultTrackCash.expectEvents {
|
||||
expect { update ->
|
||||
val actual = update.produced.single().state.data
|
||||
assertEquals(expectedState, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -180,6 +190,7 @@ class CordaRPCOpsImplTest {
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: deprecated
|
||||
vaultUpdates.expectEvents {
|
||||
sequence(
|
||||
// ISSUE
|
||||
@ -194,6 +205,21 @@ class CordaRPCOpsImplTest {
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
vaultTrackCash.expectEvents {
|
||||
sequence(
|
||||
// ISSUE
|
||||
expect { update ->
|
||||
require(update.consumed.isEmpty()) { update.consumed.size }
|
||||
require(update.produced.size == 1) { update.produced.size }
|
||||
},
|
||||
// MOVE
|
||||
expect { update ->
|
||||
require(update.consumed.size == 1) { update.consumed.size }
|
||||
require(update.produced.size == 1) { update.produced.size }
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -12,7 +12,7 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.node.utilities.transaction
|
||||
|
@ -22,6 +22,7 @@ import java.time.Clock
|
||||
|
||||
open class MockServiceHubInternal(
|
||||
val customVault: VaultService? = null,
|
||||
val customVaultQuery: VaultQueryService? = null,
|
||||
val keyManagement: KeyManagementService? = null,
|
||||
val network: MessagingService? = null,
|
||||
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
|
||||
@ -32,6 +33,8 @@ open class MockServiceHubInternal(
|
||||
val schemas: SchemaService? = NodeSchemaService(),
|
||||
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2)
|
||||
) : ServiceHubInternal() {
|
||||
override val vaultQueryService: VaultQueryService
|
||||
get() = customVaultQuery ?: throw UnsupportedOperationException()
|
||||
override val transactionVerifierService: TransactionVerifierService
|
||||
get() = customTransactionVerifierService ?: throw UnsupportedOperationException()
|
||||
override val vaultService: VaultService
|
||||
|
@ -0,0 +1,698 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import net.corda.contracts.asset.Cash
|
||||
import net.corda.contracts.asset.DummyFungibleContract
|
||||
import net.corda.contracts.testing.consumeCash
|
||||
import net.corda.contracts.testing.fillWithSomeTestCash
|
||||
import net.corda.contracts.testing.fillWithSomeTestDeals
|
||||
import net.corda.contracts.testing.fillWithSomeTestLinearStates
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.schemas.DummyLinearStateSchemaV1
|
||||
import net.corda.core.schemas.DummyLinearStateSchemaV2
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.storageKryo
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.BOB
|
||||
import net.corda.core.utilities.BOB_KEY
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.schemas.jpa.CommonSchemaV1
|
||||
import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.schemas.CashSchemaV1
|
||||
import net.corda.schemas.SampleCashSchemaV2
|
||||
import net.corda.schemas.SampleCashSchemaV3
|
||||
import net.corda.testing.BOB_PUBKEY
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.hibernate.SessionFactory
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.EntityManager
|
||||
import javax.persistence.Tuple
|
||||
import javax.persistence.criteria.CriteriaBuilder
|
||||
|
||||
class HibernateConfigurationTest {
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
val vault: VaultService get() = services.vaultService
|
||||
|
||||
// Hibernate configuration objects
|
||||
lateinit var hibernateConfig: HibernateConfiguration
|
||||
lateinit var hibernatePersister: HibernateObserver
|
||||
lateinit var sessionFactory: SessionFactory
|
||||
lateinit var entityManager: EntityManager
|
||||
lateinit var criteriaBuilder: CriteriaBuilder
|
||||
|
||||
// test States
|
||||
lateinit var cashStates: List<StateAndRef<Cash.State>>
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
val customSchemas = setOf(VaultSchemaV1, CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3)
|
||||
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database.transaction {
|
||||
|
||||
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas))
|
||||
|
||||
services = object : MockServices(BOB_KEY) {
|
||||
override val vaultService: VaultService get() {
|
||||
val vaultService = NodeVaultService(this, dataSourceProps)
|
||||
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
|
||||
return vaultService
|
||||
}
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
storageService.validatedTransactions.addTransaction(stx)
|
||||
}
|
||||
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
|
||||
vaultService.notifyAll(txs.map { it.tx })
|
||||
}
|
||||
}
|
||||
}
|
||||
setUpDb()
|
||||
|
||||
sessionFactory = hibernateConfig.sessionFactoryForSchemas(*customSchemas.toTypedArray())
|
||||
entityManager = sessionFactory.createEntityManager()
|
||||
criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
}
|
||||
|
||||
private fun setUpDb() {
|
||||
database.transaction {
|
||||
cashStates = services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L)).states.toList()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `count rows`() {
|
||||
// structure query
|
||||
val countQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
countQuery.select(criteriaBuilder.count(countQuery.from(VaultSchemaV1.VaultStates::class.java)))
|
||||
|
||||
// execute query
|
||||
val countResult = entityManager.createQuery(countQuery).singleResult
|
||||
|
||||
assertThat(countResult).isEqualTo(10)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consumed states`() {
|
||||
database.transaction {
|
||||
services.consumeCash(50.DOLLARS)
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.where(criteriaBuilder.equal(
|
||||
vaultStates.get<Vault.StateStatus>("stateStatus"), Vault.StateStatus.CONSUMED))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults.size).isEqualTo(6)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `select by composite primary key`() {
|
||||
val issuedStates =
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(8)
|
||||
services.fillWithSomeTestLinearStates(2)
|
||||
}
|
||||
val persistentStateRefs = issuedStates.states.map { PersistentStateRef(it.ref) }.toList()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val compositeKey = vaultStates.get<PersistentStateRef>("stateRef")
|
||||
criteriaQuery.where(criteriaBuilder.and(compositeKey.`in`(persistentStateRefs)))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
|
||||
assertThat(queryResults).hasSize(2)
|
||||
assertThat(queryResults.first().stateRef?.txId).isEqualTo(issuedStates.states.first().ref.txhash.toString())
|
||||
assertThat(queryResults.first().stateRef?.index).isEqualTo(issuedStates.states.first().ref.index)
|
||||
assertThat(queryResults.last().stateRef?.txId).isEqualTo(issuedStates.states.last().ref.txhash.toString())
|
||||
assertThat(queryResults.last().stateRef?.index).isEqualTo(issuedStates.states.last().ref.index)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `distinct contract types`() {
|
||||
database.transaction {
|
||||
// add 2 more contract types
|
||||
services.fillWithSomeTestLinearStates(10)
|
||||
services.fillWithSomeTestDeals(listOf("123", "456", "789"))
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(String::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.select(vaultStates.get("contractStateClassName")).distinct(true)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
Assertions.assertThat(queryResults.size).isEqualTo(3)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `with sorting`() {
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
// order by DESC
|
||||
criteriaQuery.orderBy(criteriaBuilder.desc(vaultStates.get<Instant>("recordedTime")))
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResults.map { println(it.recordedTime) }
|
||||
|
||||
// order by ASC
|
||||
criteriaQuery.orderBy(criteriaBuilder.asc(vaultStates.get<Instant>("recordedTime")))
|
||||
val queryResultsAsc = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResultsAsc.map { println(it.recordedTime) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `with pagination`() {
|
||||
// add 100 additional cash entries
|
||||
database.transaction {
|
||||
services.fillWithSomeTestCash(1000.POUNDS, DUMMY_NOTARY, 100, 100, Random(0L))
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
// set pagination
|
||||
val query = entityManager.createQuery(criteriaQuery)
|
||||
query.firstResult = 10
|
||||
query.maxResults = 15
|
||||
|
||||
// execute query
|
||||
val queryResults = query.resultList
|
||||
Assertions.assertThat(queryResults.size).isEqualTo(15)
|
||||
|
||||
// try towards end
|
||||
query.firstResult = 100
|
||||
query.maxResults = 15
|
||||
|
||||
val lastQueryResults = query.resultList
|
||||
|
||||
Assertions.assertThat(lastQueryResults.size).isEqualTo(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* VaultLinearState is a concrete table, extendible by any Contract extending a LinearState
|
||||
*/
|
||||
@Test
|
||||
fun `select by composite primary key on LinearStates`() {
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(10)
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
|
||||
criteriaQuery.select(vaultStates)
|
||||
criteriaQuery.where(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef")))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* VaultFungibleState is an abstract entity, which should be extended any Contract extending a FungibleAsset
|
||||
*/
|
||||
|
||||
/**
|
||||
* CashSchemaV1 = original Cash schema (extending PersistentState)
|
||||
*/
|
||||
@Test
|
||||
fun `count CashStates`() {
|
||||
// structure query
|
||||
val countQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
countQuery.select(criteriaBuilder.count(countQuery.from(CashSchemaV1.PersistentCashState::class.java)))
|
||||
|
||||
// execute query
|
||||
val countResult = entityManager.createQuery(countQuery).singleResult
|
||||
|
||||
assertThat(countResult).isEqualTo(10)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `select by composite primary key on CashStates`() {
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
vaultStates.join<VaultSchemaV1.VaultStates, CashSchemaV1.PersistentCashState>("stateRef")
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `select and join by composite primary key on CashStates`() {
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(5)
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultCashStates = criteriaQuery.from(CashSchemaV1.PersistentCashState::class.java)
|
||||
|
||||
criteriaQuery.select(vaultStates)
|
||||
criteriaQuery.where(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultCashStates.get<PersistentStateRef>("stateRef")))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* CashSchemaV2 = optimised Cash schema (extending FungibleState)
|
||||
*/
|
||||
@Test
|
||||
fun `count CashStates in V2`() {
|
||||
database.transaction {
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV2)
|
||||
}
|
||||
}
|
||||
|
||||
// structure query
|
||||
val countQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
countQuery.select(criteriaBuilder.count(countQuery.from(SampleCashSchemaV2.PersistentCashState::class.java)))
|
||||
|
||||
// execute query
|
||||
val countResult = entityManager.createQuery(countQuery).singleResult
|
||||
|
||||
assertThat(countResult).isEqualTo(10)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `select by composite primary key on CashStates in V2`() {
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(5)
|
||||
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV2)
|
||||
}
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultCashStates = criteriaQuery.from(SampleCashSchemaV2.PersistentCashState::class.java)
|
||||
|
||||
criteriaQuery.select(vaultStates)
|
||||
criteriaQuery.where(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultCashStates.get<PersistentStateRef>("stateRef")))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a 3-way join between:
|
||||
* - VaultStates
|
||||
* - VaultLinearStates
|
||||
* - a concrete LinearState implementation (eg. DummyLinearState)
|
||||
*/
|
||||
|
||||
/**
|
||||
* DummyLinearStateV1 = original DummyLinearState schema (extending PersistentState)
|
||||
*/
|
||||
@Test
|
||||
fun `select by composite primary between VaultStates, VaultLinearStates and DummyLinearStates`() {
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(8)
|
||||
services.fillWithSomeTestDeals(listOf("123", "456", "789"))
|
||||
services.fillWithSomeTestLinearStates(2)
|
||||
}
|
||||
|
||||
val sessionFactory = hibernateConfig.sessionFactoryForSchemas(VaultSchemaV1, DummyLinearStateSchemaV1)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
val entityManager = sessionFactory.createEntityManager()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
val dummyLinearStates = criteriaQuery.from(DummyLinearStateSchemaV1.PersistentDummyLinearState::class.java)
|
||||
|
||||
criteriaQuery.select(vaultStates)
|
||||
val joinPredicate1 = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef"))
|
||||
val joinPredicate2 = criteriaBuilder.and(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), dummyLinearStates.get<PersistentStateRef>("stateRef")))
|
||||
criteriaQuery.where(joinPredicate1, joinPredicate2)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* DummyLinearSchemaV2 = optimised DummyLinear schema (extending LinearState)
|
||||
*/
|
||||
|
||||
@Test
|
||||
fun `three way join by composite primary between VaultStates, VaultLinearStates and DummyLinearStates`() {
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(8)
|
||||
services.fillWithSomeTestDeals(listOf("123", "456", "789"))
|
||||
services.fillWithSomeTestLinearStates(2)
|
||||
}
|
||||
|
||||
val sessionFactory = hibernateConfig.sessionFactoryForSchemas(VaultSchemaV1, DummyLinearStateSchemaV2)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
val entityManager = sessionFactory.createEntityManager()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
val dummyLinearStates = criteriaQuery.from(DummyLinearStateSchemaV2.PersistentDummyLinearState::class.java)
|
||||
|
||||
criteriaQuery.select(vaultStates)
|
||||
val joinPredicate1 = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef"))
|
||||
val joinPredicate2 = criteriaBuilder.and(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), dummyLinearStates.get<PersistentStateRef>("stateRef")))
|
||||
criteriaQuery.where(joinPredicate1, joinPredicate2)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a OneToOne table mapping
|
||||
*/
|
||||
@Test
|
||||
fun `select fungible states by owner party`() {
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
criteriaQuery.from(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Query by Party (OneToOne table mapping)
|
||||
*/
|
||||
@Test
|
||||
fun `query fungible states by owner party`() {
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L), ownedBy = ALICE)
|
||||
val cashStates = services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L),
|
||||
issuedBy = BOB.ref(0), issuerKey = BOB_KEY, ownedBy = (BOB)).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
}
|
||||
|
||||
val sessionFactory = hibernateConfig.sessionFactoryForSchemas(VaultSchemaV1, CommonSchemaV1, SampleCashSchemaV3)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
val entityManager = sessionFactory.createEntityManager()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
// select
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.select(vaultStates)
|
||||
|
||||
// search predicate
|
||||
val cashStatesSchema = criteriaQuery.from(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
|
||||
val joinCashToParty = cashStatesSchema.join<SampleCashSchemaV3.PersistentCashState,CommonSchemaV1.Party>("owner")
|
||||
val queryOwnerKey = BOB_PUBKEY.toBase58String()
|
||||
criteriaQuery.where(criteriaBuilder.equal(joinCashToParty.get<CommonSchemaV1.Party>("key"), queryOwnerKey))
|
||||
|
||||
val joinVaultStatesToCash = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), cashStatesSchema.get<PersistentStateRef>("stateRef"))
|
||||
criteriaQuery.where(joinVaultStatesToCash)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
|
||||
queryResults.forEach {
|
||||
val contractState = it.contractState.deserialize<TransactionState<ContractState>>(storageKryo())
|
||||
val cashState = contractState.data as Cash.State
|
||||
println("${it.stateRef} with owner: ${cashState.owner.owningKey.toBase58String()}") }
|
||||
|
||||
assertThat(queryResults).hasSize(12)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a OneToMany table mapping
|
||||
*/
|
||||
@Test
|
||||
fun `select fungible states by participants`() {
|
||||
database.transaction {
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
criteriaQuery.from(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
|
||||
assertThat(queryResults).hasSize(10)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Query by participants (OneToMany table mapping)
|
||||
*/
|
||||
@Test
|
||||
fun `query fungible states by participants`() {
|
||||
val firstCashState =
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
|
||||
val moreCash = services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L),
|
||||
issuedBy = BOB.ref(0), issuerKey = BOB_KEY, ownedBy = BOB).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
moreCash.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
|
||||
val cashStates = services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L), ownedBy = (ALICE)).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
cashStates.first()
|
||||
}
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultStates::class.java)
|
||||
|
||||
// select
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.select(vaultStates)
|
||||
|
||||
// search predicate
|
||||
val cashStatesSchema = criteriaQuery.from(SampleCashSchemaV3.PersistentCashState::class.java)
|
||||
|
||||
val joinCashToParty = cashStatesSchema.join<SampleCashSchemaV3.PersistentCashState, CommonSchemaV1.Party>("participants")
|
||||
val queryParticipantKeys = firstCashState.state.data.participants.map { it.owningKey.toBase58String() }
|
||||
criteriaQuery.where(criteriaBuilder.equal(joinCashToParty.get<CommonSchemaV1.Party>("key"), queryParticipantKeys))
|
||||
|
||||
val joinVaultStatesToCash = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), cashStatesSchema.get<PersistentStateRef>("stateRef"))
|
||||
criteriaQuery.where(joinVaultStatesToCash)
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResults.forEach {
|
||||
val contractState = it.contractState.deserialize<TransactionState<ContractState>>(storageKryo())
|
||||
val cashState = contractState.data as Cash.State
|
||||
println("${it.stateRef} with owner ${cashState.owner.owningKey.toBase58String()} and participants ${cashState.participants.map { it.owningKey.toBase58String() }}")
|
||||
}
|
||||
|
||||
assertThat(queryResults).hasSize(12)
|
||||
}
|
||||
|
||||
/**
|
||||
* Query with sorting on Common table attribute
|
||||
*/
|
||||
@Test
|
||||
fun `with sorting on attribute from common table`() {
|
||||
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(1, externalId = "111")
|
||||
services.fillWithSomeTestLinearStates(2, externalId = "222")
|
||||
services.fillWithSomeTestLinearStates(3, externalId = "333")
|
||||
}
|
||||
|
||||
val sessionFactory = hibernateConfig.sessionFactoryForSchemas(VaultSchemaV1, DummyLinearStateSchemaV2)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
val entityManager = sessionFactory.createEntityManager()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
|
||||
// join
|
||||
criteriaQuery.multiselect(vaultStates, vaultLinearStates)
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef"))
|
||||
criteriaQuery.where(joinPredicate)
|
||||
|
||||
// order by DESC
|
||||
criteriaQuery.orderBy(criteriaBuilder.desc(vaultLinearStates.get<String>("externalId")))
|
||||
criteriaQuery.orderBy(criteriaBuilder.desc(vaultLinearStates.get<UUID>("uuid")))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResults.map {
|
||||
val vaultState = it[0] as VaultSchemaV1.VaultStates
|
||||
val vaultLinearState = it[1] as VaultSchemaV1.VaultLinearStates
|
||||
println("${vaultState.stateRef} : ${vaultLinearState.externalId} ${vaultLinearState.uuid}")
|
||||
}
|
||||
|
||||
// order by ASC
|
||||
criteriaQuery.orderBy(criteriaBuilder.asc(vaultLinearStates.get<String>("externalId")))
|
||||
criteriaQuery.orderBy(criteriaBuilder.asc(vaultLinearStates.get<UUID>("uuid")))
|
||||
|
||||
// execute query
|
||||
val queryResultsAsc = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResultsAsc.map {
|
||||
val vaultState = it[0] as VaultSchemaV1.VaultStates
|
||||
val vaultLinearState = it[1] as VaultSchemaV1.VaultLinearStates
|
||||
println("${vaultState.stateRef} : ${vaultLinearState.externalId} ${vaultLinearState.uuid}")
|
||||
}
|
||||
|
||||
assertThat(queryResults).hasSize(6)
|
||||
}
|
||||
|
||||
/**
|
||||
* Query with sorting on Custom table attribute
|
||||
*/
|
||||
@Test
|
||||
fun `with sorting on attribute from custom table`() {
|
||||
|
||||
database.transaction {
|
||||
services.fillWithSomeTestLinearStates(1, externalId = "111")
|
||||
services.fillWithSomeTestLinearStates(2, externalId = "222")
|
||||
services.fillWithSomeTestLinearStates(3, externalId = "333")
|
||||
}
|
||||
|
||||
val sessionFactory = hibernateConfig.sessionFactoryForSchemas(VaultSchemaV1, DummyLinearStateSchemaV1)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
val entityManager = sessionFactory.createEntityManager()
|
||||
|
||||
// structure query
|
||||
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
||||
val vaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
val vaultLinearStates = criteriaQuery.from(VaultSchemaV1.VaultLinearStates::class.java)
|
||||
val dummyLinearStates = criteriaQuery.from(DummyLinearStateSchemaV1.PersistentDummyLinearState::class.java)
|
||||
|
||||
// join
|
||||
criteriaQuery.multiselect(vaultStates, vaultLinearStates, dummyLinearStates)
|
||||
val joinPredicate1 = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), vaultLinearStates.get<PersistentStateRef>("stateRef"))
|
||||
val joinPredicate2 = criteriaBuilder.and(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), dummyLinearStates.get<PersistentStateRef>("stateRef")))
|
||||
criteriaQuery.where(joinPredicate1, joinPredicate2)
|
||||
|
||||
// order by DESC
|
||||
criteriaQuery.orderBy(criteriaBuilder.desc(dummyLinearStates.get<String>("externalId")))
|
||||
criteriaQuery.orderBy(criteriaBuilder.desc(dummyLinearStates.get<UUID>("uuid")))
|
||||
|
||||
// execute query
|
||||
val queryResults = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResults.map {
|
||||
val vaultState = it[0] as VaultSchemaV1.VaultStates
|
||||
val _vaultLinearStates = it[1] as VaultSchemaV1.VaultLinearStates
|
||||
val _dummyLinearStates = it[2] as DummyLinearStateSchemaV1.PersistentDummyLinearState
|
||||
println("${vaultState.stateRef} : [${_dummyLinearStates.externalId} ${_dummyLinearStates.uuid}] : [${_vaultLinearStates.externalId} ${_vaultLinearStates.uuid}]")
|
||||
}
|
||||
|
||||
// order by ASC
|
||||
criteriaQuery.orderBy(criteriaBuilder.asc(dummyLinearStates.get<String>("externalId")))
|
||||
criteriaQuery.orderBy(criteriaBuilder.asc(dummyLinearStates.get<UUID>("uuid")))
|
||||
|
||||
// execute query
|
||||
val queryResultsAsc = entityManager.createQuery(criteriaQuery).resultList
|
||||
queryResultsAsc.map {
|
||||
val vaultState = it[0] as VaultSchemaV1.VaultStates
|
||||
val _vaultLinearStates = it[1] as VaultSchemaV1.VaultLinearStates
|
||||
val _dummyLinearStates = it[2] as DummyLinearStateSchemaV1.PersistentDummyLinearState
|
||||
println("${vaultState.stateRef} : [${_dummyLinearStates.externalId} ${_dummyLinearStates.uuid}] : [${_vaultLinearStates.externalId} ${_vaultLinearStates.uuid}]")
|
||||
}
|
||||
|
||||
assertThat(queryResults).hasSize(6)
|
||||
}
|
||||
|
||||
}
|
@ -19,10 +19,10 @@ import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.core.utilities.DUMMY_PUBKEY_1
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.vault.schemas.Models
|
||||
import net.corda.node.services.vault.schemas.VaultCashBalancesEntity
|
||||
import net.corda.node.services.vault.schemas.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.VaultStatesEntity
|
||||
import net.corda.node.services.vault.schemas.requery.Models
|
||||
import net.corda.node.services.vault.schemas.requery.VaultCashBalancesEntity
|
||||
import net.corda.node.services.vault.schemas.requery.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
@ -123,6 +124,55 @@ class RequeryConfigurationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `bounded iteration`() {
|
||||
// insert 100 entities
|
||||
database.transaction {
|
||||
requerySession.withTransaction {
|
||||
(1..100)
|
||||
.map { newTransaction(it) }
|
||||
.forEach { insert(createVaultStateEntity(it)) }
|
||||
}
|
||||
}
|
||||
|
||||
// query entities 41..45
|
||||
database.transaction {
|
||||
requerySession.withTransaction {
|
||||
// Note: cannot specify a limit explicitly when using iterator skip & take
|
||||
val query = select(VaultSchema.VaultStates::class)
|
||||
val count = query.get().count()
|
||||
Assertions.assertThat(count).isEqualTo(100)
|
||||
val result = query.get().iterator(40, 5)
|
||||
Assertions.assertThat(result.asSequence().count()).isEqualTo(5)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test calling an arbitrary JDBC native query`() {
|
||||
val txn = newTransaction()
|
||||
|
||||
database.transaction {
|
||||
transactionStorage.addTransaction(txn)
|
||||
requerySession.withTransaction {
|
||||
insert(createVaultStateEntity(txn))
|
||||
}
|
||||
}
|
||||
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
val nativeQuery = "SELECT v.transaction_id, v.output_index FROM vault_states v WHERE v.state_status = 0"
|
||||
|
||||
database.transaction {
|
||||
val configuration = RequeryConfiguration(dataSourceProperties, true)
|
||||
val jdbcSession = configuration.jdbcSession()
|
||||
val prepStatement = jdbcSession.prepareStatement(nativeQuery)
|
||||
val rs = prepStatement.executeQuery()
|
||||
assertTrue(rs.next())
|
||||
assertEquals(rs.getString(1), txn.tx.inputs[0].txhash.toString())
|
||||
assertEquals(rs.getInt(2), txn.tx.inputs[0].index)
|
||||
}
|
||||
}
|
||||
|
||||
private fun createVaultStateEntity(txn: SignedTransaction): VaultStatesEntity {
|
||||
val txnState = txn.tx.inputs[0]
|
||||
val state = VaultStatesEntity().apply {
|
||||
@ -158,9 +208,9 @@ class RequeryConfigurationTest {
|
||||
}
|
||||
}
|
||||
|
||||
private fun newTransaction(): SignedTransaction {
|
||||
private fun newTransaction(index: Int = 0): SignedTransaction {
|
||||
val wtx = WireTransaction(
|
||||
inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)),
|
||||
inputs = listOf(StateRef(SecureHash.randomSHA256(), index)),
|
||||
attachments = emptyList(),
|
||||
outputs = emptyList(),
|
||||
commands = emptyList(),
|
||||
|
@ -11,7 +11,7 @@ import net.corda.core.utilities.LogHelper
|
||||
import net.corda.core.write
|
||||
import net.corda.core.writeLines
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.AttachmentEntity
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
|
@ -1,9 +1,6 @@
|
||||
package net.corda.node.services.schema
|
||||
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
@ -13,6 +10,7 @@ import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
@ -99,9 +97,9 @@ class HibernateObserverTests {
|
||||
val schemaService = object : SchemaService {
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = emptyMap()
|
||||
|
||||
override fun selectSchemas(state: QueryableState): Iterable<MappedSchema> = setOf(testSchema)
|
||||
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> = setOf(testSchema)
|
||||
|
||||
override fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState {
|
||||
override fun generateMappedObject(state: ContractState, schema: MappedSchema): PersistentState {
|
||||
val parent = Parent()
|
||||
parent.children.add(Child())
|
||||
parent.children.add(Child())
|
||||
@ -110,14 +108,14 @@ class HibernateObserverTests {
|
||||
}
|
||||
|
||||
@Suppress("UNUSED_VARIABLE")
|
||||
val observer = HibernateObserver(rawUpdatesPublisher, schemaService)
|
||||
val observer = HibernateObserver(rawUpdatesPublisher, HibernateConfiguration(schemaService))
|
||||
database.transaction {
|
||||
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
|
||||
val parentRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from contract_Parents").executeQuery()
|
||||
val parentRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()
|
||||
parentRowCountResult.next()
|
||||
val parentRows = parentRowCountResult.getInt(1)
|
||||
parentRowCountResult.close()
|
||||
val childrenRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from contract_Children").executeQuery()
|
||||
val childrenRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery()
|
||||
childrenRowCountResult.next()
|
||||
val childrenRows = childrenRowCountResult.getInt(1)
|
||||
childrenRowCountResult.close()
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import net.corda.contracts.DummyDealContract
|
||||
import net.corda.contracts.asset.Cash
|
||||
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
|
||||
import net.corda.contracts.testing.*
|
||||
|
Reference in New Issue
Block a user