mirror of
https://github.com/corda/corda.git
synced 2025-06-21 08:40:03 +00:00
Vault Query performance fix (#1256)
* Do not query database to maintain list of contract state interfaces to concrete concrete state types (use vault observable to construct same) (note this mechanism is tied to transaction boundaries for visibility of updated states) * Build contract types list from vault rawupdates observable(to avoid waiting for transaction commits). Reverted all JUnits to original state. * Bootstrap map from vault database state (node re-start) Skip reflection for already seen types. * Explicitly close session instances after query execution. * Use auto-closeable to scope sessions.
This commit is contained in:
@ -772,7 +772,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) }
|
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) }
|
||||||
override val vaultService by lazy { NodeVaultService(this, configuration.dataSourceProperties, configuration.database) }
|
override val vaultService by lazy { NodeVaultService(this, configuration.dataSourceProperties, configuration.database) }
|
||||||
override val vaultQueryService by lazy {
|
override val vaultQueryService by lazy {
|
||||||
HibernateVaultQueryImpl(HibernateConfiguration(schemaService, configuration.database ?: Properties(), { identityService }), vaultService.updatesPublisher)
|
HibernateVaultQueryImpl(HibernateConfiguration(schemaService, configuration.database ?: Properties(), { identityService }), vaultService)
|
||||||
}
|
}
|
||||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||||
|
@ -21,7 +21,7 @@ import javax.persistence.criteria.*
|
|||||||
|
|
||||||
|
|
||||||
class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
|
class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
|
||||||
val contractTypeMappings: Map<String, List<String>>,
|
val contractTypeMappings: Map<String, Set<String>>,
|
||||||
val criteriaBuilder: CriteriaBuilder,
|
val criteriaBuilder: CriteriaBuilder,
|
||||||
val criteriaQuery: CriteriaQuery<Tuple>,
|
val criteriaQuery: CriteriaQuery<Tuple>,
|
||||||
val vaultStates: Root<VaultSchemaV1.VaultStates>) : IQueryCriteriaParser {
|
val vaultStates: Root<VaultSchemaV1.VaultStates>) : IQueryCriteriaParser {
|
||||||
@ -97,7 +97,7 @@ class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
|
|||||||
private fun deriveContractTypes(contractStateTypes: Set<Class<out ContractState>>? = null): List<String> {
|
private fun deriveContractTypes(contractStateTypes: Set<Class<out ContractState>>? = null): List<String> {
|
||||||
val combinedContractStateTypes = contractStateTypes?.plus(contractType) ?: setOf(contractType)
|
val combinedContractStateTypes = contractStateTypes?.plus(contractType) ?: setOf(contractType)
|
||||||
combinedContractStateTypes.filter { it.name != ContractState::class.java.name }.let {
|
combinedContractStateTypes.filter { it.name != ContractState::class.java.name }.let {
|
||||||
val interfaces = it.flatMap { contractTypeMappings[it.name] ?: emptyList() }
|
val interfaces = it.flatMap { contractTypeMappings[it.name] ?: listOf(it.name) }
|
||||||
val concrete = it.filter { !it.isInterface }.map { it.name }
|
val concrete = it.filter { !it.isInterface }.map { it.name }
|
||||||
return interfaces.plus(concrete)
|
return interfaces.plus(concrete)
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
package net.corda.node.services.vault
|
package net.corda.node.services.vault
|
||||||
|
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.contracts.StateAndRef
|
import net.corda.core.contracts.StateAndRef
|
||||||
import net.corda.core.contracts.StateRef
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.contracts.TransactionState
|
import net.corda.core.contracts.TransactionState
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.node.services.Vault
|
import net.corda.core.node.services.Vault
|
||||||
import net.corda.core.node.services.VaultQueryException
|
import net.corda.core.node.services.VaultQueryException
|
||||||
import net.corda.core.node.services.VaultQueryService
|
import net.corda.core.node.services.VaultQueryService
|
||||||
|
import net.corda.core.node.services.VaultService
|
||||||
import net.corda.core.node.services.vault.*
|
import net.corda.core.node.services.vault.*
|
||||||
import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria
|
import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria
|
||||||
import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT
|
import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT
|
||||||
@ -18,18 +19,18 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.services.database.HibernateConfiguration
|
import net.corda.node.services.database.HibernateConfiguration
|
||||||
|
import org.hibernate.Session
|
||||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||||
import rx.subjects.PublishSubject
|
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import java.lang.Exception
|
import java.lang.Exception
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import javax.persistence.EntityManager
|
|
||||||
import javax.persistence.Tuple
|
import javax.persistence.Tuple
|
||||||
|
|
||||||
|
|
||||||
class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||||
val updatesPublisher: PublishSubject<Vault.Update<ContractState>>) : SingletonSerializeAsToken(), VaultQueryService {
|
val vault: VaultService) : SingletonSerializeAsToken(), VaultQueryService {
|
||||||
companion object {
|
companion object {
|
||||||
val log = loggerFor<HibernateVaultQueryImpl>()
|
val log = loggerFor<HibernateVaultQueryImpl>()
|
||||||
}
|
}
|
||||||
@ -37,6 +38,29 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
|||||||
private val sessionFactory = hibernateConfig.sessionFactoryForRegisteredSchemas()
|
private val sessionFactory = hibernateConfig.sessionFactoryForRegisteredSchemas()
|
||||||
private val criteriaBuilder = sessionFactory.criteriaBuilder
|
private val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maintain a list of contract state interfaces to concrete types stored in the vault
|
||||||
|
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
|
||||||
|
*/
|
||||||
|
private val contractTypeMappings = bootstrapContractStateTypes()
|
||||||
|
|
||||||
|
init {
|
||||||
|
vault.rawUpdates.subscribe { update ->
|
||||||
|
update.produced.forEach {
|
||||||
|
val concreteType = it.state.data.javaClass
|
||||||
|
log.trace { "State update of type: $concreteType" }
|
||||||
|
val seen = contractTypeMappings.any { it.value.contains(concreteType.name) }
|
||||||
|
if (!seen) {
|
||||||
|
val contractInterfaces = deriveContractInterfaces(concreteType)
|
||||||
|
contractInterfaces.map {
|
||||||
|
val contractInterface = contractTypeMappings.getOrPut(it.name, { mutableSetOf() })
|
||||||
|
contractInterface.add(concreteType.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Throws(VaultQueryException::class)
|
@Throws(VaultQueryException::class)
|
||||||
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): Vault.Page<T> {
|
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): Vault.Page<T> {
|
||||||
log.info("Vault Query for contract type: $contractType, criteria: $criteria, pagination: $paging, sorting: $sorting")
|
log.info("Vault Query for contract type: $contractType, criteria: $criteria, pagination: $paging, sorting: $sorting")
|
||||||
@ -50,15 +74,12 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
|||||||
totalStates = results.otherResults[0] as Long
|
totalStates = results.otherResults[0] as Long
|
||||||
}
|
}
|
||||||
|
|
||||||
val session = sessionFactory.withOptions().
|
val session = getSession()
|
||||||
connection(TransactionManager.current().connection).
|
|
||||||
openSession()
|
|
||||||
|
|
||||||
session.use {
|
session.use {
|
||||||
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java)
|
||||||
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||||
|
|
||||||
val contractTypeMappings = resolveUniqueContractStateTypes(session)
|
|
||||||
// TODO: revisit (use single instance of parser for all queries)
|
// TODO: revisit (use single instance of parser for all queries)
|
||||||
val criteriaParser = HibernateQueryCriteriaParser(contractType, contractTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
|
val criteriaParser = HibernateQueryCriteriaParser(contractType, contractTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates)
|
||||||
|
|
||||||
@ -116,42 +137,50 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val mutex = ThreadBox({ updatesPublisher })
|
private val mutex = ThreadBox({ vault.updatesPublisher })
|
||||||
|
|
||||||
@Throws(VaultQueryException::class)
|
@Throws(VaultQueryException::class)
|
||||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||||
return mutex.locked {
|
return mutex.locked {
|
||||||
val snapshotResults = _queryBy(criteria, paging, sorting, contractType)
|
val snapshotResults = _queryBy(criteria, paging, sorting, contractType)
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
val updates = updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) } as Observable<Vault.Update<T>>
|
val updates = vault.updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) } as Observable<Vault.Update<T>>
|
||||||
DataFeed(snapshotResults, updates)
|
DataFeed(snapshotResults, updates)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun getSession(): Session {
|
||||||
|
return sessionFactory.withOptions().
|
||||||
|
connection(TransactionManager.current().connection).
|
||||||
|
openSession()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maintain a list of contract state interfaces to concrete types stored in the vault
|
* Derive list from existing vault states and then incrementally update using vault observables
|
||||||
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
|
|
||||||
*/
|
*/
|
||||||
fun resolveUniqueContractStateTypes(session: EntityManager): Map<String, List<String>> {
|
fun bootstrapContractStateTypes(): MutableMap<String, MutableSet<String>> {
|
||||||
val criteria = criteriaBuilder.createQuery(String::class.java)
|
val criteria = criteriaBuilder.createQuery(String::class.java)
|
||||||
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
|
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
|
||||||
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
|
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
|
||||||
|
val session = getSession()
|
||||||
|
session.use {
|
||||||
val query = session.createQuery(criteria)
|
val query = session.createQuery(criteria)
|
||||||
val results = query.resultList
|
val results = query.resultList
|
||||||
val distinctTypes = results.map { it }
|
val distinctTypes = results.map { it }
|
||||||
|
|
||||||
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableList<String>>()
|
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableSet<String>>()
|
||||||
distinctTypes.forEach { it ->
|
distinctTypes.forEach { type ->
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
val concreteType = Class.forName(it) as Class<ContractState>
|
val concreteType = Class.forName(type) as Class<ContractState>
|
||||||
val contractInterfaces = deriveContractInterfaces(concreteType)
|
val contractInterfaces = deriveContractInterfaces(concreteType)
|
||||||
contractInterfaces.map {
|
contractInterfaces.map {
|
||||||
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableListOf() })
|
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name, { mutableSetOf() })
|
||||||
contractInterface.add(concreteType.name)
|
contractInterface.add(concreteType.name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return contractInterfaceToConcreteTypes
|
return contractInterfaceToConcreteTypes
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun <T : ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
|
private fun <T : ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
|
||||||
val myInterfaces: MutableSet<Class<T>> = mutableSetOf()
|
val myInterfaces: MutableSet<Class<T>> = mutableSetOf()
|
||||||
|
@ -235,7 +235,7 @@ fun makeTestDatabaseAndMockServices(customSchemas: Set<MappedSchema> = setOf(Com
|
|||||||
vaultService.notifyAll(txs.map { it.tx })
|
vaultService.notifyAll(txs.map { it.tx })
|
||||||
}
|
}
|
||||||
|
|
||||||
override val vaultQueryService: VaultQueryService = HibernateVaultQueryImpl(hibernateConfig, vaultService.updatesPublisher)
|
override val vaultQueryService: VaultQueryService = HibernateVaultQueryImpl(hibernateConfig, vaultService)
|
||||||
|
|
||||||
override fun jdbcSession(): Connection = database.createSession()
|
override fun jdbcSession(): Connection = database.createSession()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user