mirror of
https://github.com/corda/corda.git
synced 2025-03-16 00:55:24 +00:00
ENT-2376 Reduce database queries during publishing of consumed states (#1317)
This commit is contained in:
parent
4f3e4f03ff
commit
ba3695a333
@ -906,7 +906,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
protected open fun makeVaultService(keyManagementService: KeyManagementService,
|
||||
services: ServicesForResolution,
|
||||
database: CordaPersistence): VaultServiceInternal {
|
||||
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService)
|
||||
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, configuration.transactionCacheSizeBytes)
|
||||
}
|
||||
|
||||
/** Load configured JVM agents */
|
||||
|
@ -29,7 +29,6 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigT
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.tools.shell.SSHDConfiguration
|
||||
import org.slf4j.Logger
|
||||
import sun.security.x509.X500Name
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
@ -37,6 +36,7 @@ import java.util.*
|
||||
import javax.security.auth.x500.X500Principal
|
||||
|
||||
val Int.MB: Long get() = this * 1024L * 1024L
|
||||
val Int.KB: Long get() = this * 1024L
|
||||
|
||||
private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
private val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
|
@ -18,9 +18,9 @@ import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.hibernate.annotations.Type
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.Serializable
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
@ -28,7 +28,6 @@ import java.util.stream.Stream
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import org.hibernate.annotations.Type
|
||||
|
||||
/**
|
||||
* Simple checkpoint key value storage in DB.
|
||||
|
@ -12,13 +12,17 @@ package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.StatesNotAvailableException
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -26,6 +30,8 @@ import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.config.KB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.schema.PersistentStateService
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -57,14 +63,14 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.(
|
||||
* this is not the case.
|
||||
*
|
||||
* TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time.
|
||||
* TODO: have transaction storage do some caching.
|
||||
*/
|
||||
class NodeVaultService(
|
||||
private val clock: Clock,
|
||||
private val keyManagementService: KeyManagementService,
|
||||
private val servicesForResolution: ServicesForResolution,
|
||||
private val database: CordaPersistence,
|
||||
private val schemaService: SchemaService
|
||||
private val schemaService: SchemaService,
|
||||
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
|
||||
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
@ -89,6 +95,12 @@ class NodeVaultService(
|
||||
*/
|
||||
private val contractStateTypeMappings = mutableMapOf<String, MutableSet<String>>()
|
||||
|
||||
/**
|
||||
* This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache.
|
||||
* This size results in minimum of 1024.
|
||||
*/
|
||||
private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build<SecureHash, BitSet>()
|
||||
|
||||
override fun start() {
|
||||
criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder
|
||||
bootstrapContractStateTypes()
|
||||
@ -136,19 +148,19 @@ class NodeVaultService(
|
||||
state.stateRef = PersistentStateRef(stateAndRef.key)
|
||||
session.save(state)
|
||||
}
|
||||
consumedStateRefs.forEach { stateRef ->
|
||||
val state = session.get<VaultSchemaV1.VaultStates>(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef))
|
||||
state?.run {
|
||||
stateStatus = Vault.StateStatus.CONSUMED
|
||||
consumedTime = now
|
||||
// remove lock (if held)
|
||||
if (lockId != null) {
|
||||
lockId = null
|
||||
lockUpdateTime = now
|
||||
log.trace("Releasing soft lock on consumed state: $stateRef")
|
||||
}
|
||||
session.save(state)
|
||||
}
|
||||
if (consumedStateRefs.isNotEmpty()) {
|
||||
// We have to do this so that the session does not hold onto the prior version of the states status. i.e.
|
||||
// it is not aware of this query.
|
||||
session.flush()
|
||||
session.clear()
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val updateQuery = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java)
|
||||
val root = updateQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
updateQuery.set(root.get<Vault.StateStatus>(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.CONSUMED)
|
||||
updateQuery.set(root.get<Instant>(VaultSchemaV1.VaultStates::consumedTime.name), now)
|
||||
updateQuery.set(root.get<String>(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java))
|
||||
updateQuery.where(root.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name).`in`(consumedStateRefs.map { PersistentStateRef(it) }))
|
||||
session.createQuery(updateQuery).executeUpdate()
|
||||
}
|
||||
}
|
||||
return update
|
||||
@ -162,7 +174,10 @@ class NodeVaultService(
|
||||
|
||||
/** Groups adjacent transactions into batches to generate separate net updates per transaction type. */
|
||||
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) {
|
||||
if (statesToRecord == StatesToRecord.NONE || !txns.any()) return
|
||||
if (statesToRecord == StatesToRecord.NONE || !txns.any()) {
|
||||
txns.forEach { producedStatesMapping.put(it.id, BitSet(0)) }
|
||||
return
|
||||
}
|
||||
val batch = mutableListOf<CoreTransaction>()
|
||||
|
||||
fun flushBatch() {
|
||||
@ -182,16 +197,21 @@ class NodeVaultService(
|
||||
|
||||
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord): List<Vault.Update<ContractState>> {
|
||||
fun makeUpdate(tx: WireTransaction): Vault.Update<ContractState>? {
|
||||
val outputsBitSet = BitSet(tx.outputs.size)
|
||||
val ourNewStates = when (statesToRecord) {
|
||||
StatesToRecord.NONE -> throw AssertionError("Should not reach here")
|
||||
StatesToRecord.ONLY_RELEVANT -> tx.outputs.withIndex().filter {
|
||||
isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet())
|
||||
}
|
||||
StatesToRecord.ALL_VISIBLE -> tx.outputs.withIndex()
|
||||
}.map { tx.outRef<ContractState>(it.index) }
|
||||
}.map {
|
||||
outputsBitSet[it.index] = true
|
||||
tx.outRef<ContractState>(it.index)
|
||||
}
|
||||
producedStatesMapping.put(tx.id, outputsBitSet)
|
||||
|
||||
// Retrieve all unconsumed states for this transaction's inputs
|
||||
val consumedStates = loadStates(tx.inputs)
|
||||
val consumedStates = loadStatesWithVaultFilter(tx.inputs)
|
||||
|
||||
// Is transaction irrelevant?
|
||||
if (consumedStates.isEmpty() && ourNewStates.isEmpty()) {
|
||||
@ -240,19 +260,49 @@ class NodeVaultService(
|
||||
}
|
||||
}
|
||||
|
||||
private fun loadStates(refs: Collection<StateRef>): Collection<StateAndRef<ContractState>> {
|
||||
val states = mutableListOf<StateAndRef<ContractState>>()
|
||||
private fun loadStatesWithVaultFilter(refs: Collection<StateRef>): Collection<StateAndRef<ContractState>> {
|
||||
val states = mutableSetOf<StateRef>()
|
||||
if (refs.isNotEmpty()) {
|
||||
val refsList = refs.toList()
|
||||
val pageSize = PageSpecification().pageSize
|
||||
(0..(refsList.size - 1) / pageSize).forEach {
|
||||
val offset = it * pageSize
|
||||
val limit = minOf(offset + pageSize, refsList.size)
|
||||
val page = queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states
|
||||
states.addAll(page)
|
||||
// Eliminate any that we have cached.
|
||||
val uncachedTx = refs.groupBy { it.txhash }.filter {
|
||||
val cachedProduced = producedStatesMapping.getIfPresent(it.key)
|
||||
if (cachedProduced == null) {
|
||||
true
|
||||
} else {
|
||||
// Add to results.
|
||||
states.addAll(it.value.filter { cachedProduced[it.index] })
|
||||
false
|
||||
}
|
||||
}
|
||||
// If we have uncached, go and get those in a single query
|
||||
if (uncachedTx.isNotEmpty()) {
|
||||
// Select all rows with matching stateRefs
|
||||
val criteriaQuery = criteriaBuilder.createTupleQuery()
|
||||
val root = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteriaQuery.multiselect(root.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name))
|
||||
val txIds = uncachedTx.keys.map { it.toString() }
|
||||
val compositeKey = root.get<PersistentStateRef>(VaultSchemaV1.VaultStates::stateRef.name).get<String>(PersistentStateRef::txId.name)
|
||||
criteriaQuery.where(compositeKey.`in`(txIds))
|
||||
|
||||
// prepare query for execution
|
||||
val session = currentDBSession()
|
||||
val query = session.createQuery(criteriaQuery)
|
||||
|
||||
// execution. For each transaction:
|
||||
query.resultList.map { it[0] as PersistentStateRef }.groupBy { it.txId }.forEach {
|
||||
// Record what states were found, in the cache and the results.
|
||||
val secureHash = SecureHash.parse(it.key)
|
||||
val outputsBitSet = BitSet(0) // This is auto-expanded when setting higher bits.
|
||||
states.addAll(it.value.map {
|
||||
outputsBitSet[it.index] = true
|
||||
StateRef(secureHash, it.index)
|
||||
})
|
||||
// Cache the result for future lookups.
|
||||
producedStatesMapping.put(secureHash, outputsBitSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
return states
|
||||
return servicesForResolution.loadStates(states)
|
||||
}
|
||||
|
||||
private fun processAndNotify(updates: List<Vault.Update<ContractState>>) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user