From ba3695a3330e518bbad99012f48fd2e74f14835f Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Tue, 7 Aug 2018 17:46:12 +0100 Subject: [PATCH] ENT-2376 Reduce database queries during publishing of consumed states (#1317) --- .../net/corda/node/internal/AbstractNode.kt | 2 +- .../node/services/config/NodeConfiguration.kt | 2 +- .../persistence/DBCheckpointStorage.kt | 3 +- .../node/services/vault/NodeVaultService.kt | 108 +++++++++++++----- 4 files changed, 82 insertions(+), 33 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 77e144dcb4..5b6174759c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -906,7 +906,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, database: CordaPersistence): VaultServiceInternal { - return NodeVaultService(platformClock, keyManagementService, services, database, schemaService) + return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, configuration.transactionCacheSizeBytes) } /** Load configured JVM agents */ diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 4370254c5b..44df33f1a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -29,7 +29,6 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigT import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.tools.shell.SSHDConfiguration import org.slf4j.Logger -import sun.security.x509.X500Name import java.net.URL import java.nio.file.Path import java.time.Duration @@ -37,6 +36,7 @@ import java.util.* import javax.security.auth.x500.X500Principal val Int.MB: Long get() = this * 1024L * 1024L +val Int.KB: Long get() = this * 1024L private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1) private val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 8c6a1efc3e..97d3665676 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -18,9 +18,9 @@ import net.corda.node.services.statemachine.Checkpoint import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY +import org.hibernate.annotations.Type import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.io.Serializable import java.sql.Connection import java.sql.SQLException import java.util.* @@ -28,7 +28,6 @@ import java.util.stream.Stream import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id -import org.hibernate.annotations.Type /** * Simple checkpoint key value storage in DB. diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index ef8f836f91..899aa035e0 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -12,13 +12,17 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand +import com.github.benmanes.caffeine.cache.Caffeine import net.corda.core.contracts.* import net.corda.core.crypto.SecureHash import net.corda.core.internal.* import net.corda.core.messaging.DataFeed import net.corda.core.node.ServicesForResolution import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.* +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.StatesNotAvailableException +import net.corda.core.node.services.Vault +import net.corda.core.node.services.VaultQueryException import net.corda.core.node.services.vault.* import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken @@ -26,6 +30,8 @@ import net.corda.core.transactions.* import net.corda.core.utilities.* import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal +import net.corda.node.services.config.KB +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -57,14 +63,14 @@ private fun CriteriaBuilder.executeUpdate(session: Session, configure: Root<*>.( * this is not the case. * * TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time. - * TODO: have transaction storage do some caching. */ class NodeVaultService( private val clock: Clock, private val keyManagementService: KeyManagementService, private val servicesForResolution: ServicesForResolution, private val database: CordaPersistence, - private val schemaService: SchemaService + private val schemaService: SchemaService, + transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize ) : SingletonSerializeAsToken(), VaultServiceInternal { private companion object { private val log = contextLogger() @@ -89,6 +95,12 @@ class NodeVaultService( */ private val contractStateTypeMappings = mutableMapOf>() + /** + * This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache. + * This size results in minimum of 1024. + */ + private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build() + override fun start() { criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder bootstrapContractStateTypes() @@ -136,19 +148,19 @@ class NodeVaultService( state.stateRef = PersistentStateRef(stateAndRef.key) session.save(state) } - consumedStateRefs.forEach { stateRef -> - val state = session.get(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef)) - state?.run { - stateStatus = Vault.StateStatus.CONSUMED - consumedTime = now - // remove lock (if held) - if (lockId != null) { - lockId = null - lockUpdateTime = now - log.trace("Releasing soft lock on consumed state: $stateRef") - } - session.save(state) - } + if (consumedStateRefs.isNotEmpty()) { + // We have to do this so that the session does not hold onto the prior version of the states status. i.e. + // it is not aware of this query. + session.flush() + session.clear() + val criteriaBuilder = session.criteriaBuilder + val updateQuery = criteriaBuilder.createCriteriaUpdate(VaultSchemaV1.VaultStates::class.java) + val root = updateQuery.from(VaultSchemaV1.VaultStates::class.java) + updateQuery.set(root.get(VaultSchemaV1.VaultStates::stateStatus.name), Vault.StateStatus.CONSUMED) + updateQuery.set(root.get(VaultSchemaV1.VaultStates::consumedTime.name), now) + updateQuery.set(root.get(VaultSchemaV1.VaultStates::lockId.name), criteriaBuilder.nullLiteral(String::class.java)) + updateQuery.where(root.get(VaultSchemaV1.VaultStates::stateRef.name).`in`(consumedStateRefs.map { PersistentStateRef(it) })) + session.createQuery(updateQuery).executeUpdate() } } return update @@ -162,7 +174,10 @@ class NodeVaultService( /** Groups adjacent transactions into batches to generate separate net updates per transaction type. */ override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable) { - if (statesToRecord == StatesToRecord.NONE || !txns.any()) return + if (statesToRecord == StatesToRecord.NONE || !txns.any()) { + txns.forEach { producedStatesMapping.put(it.id, BitSet(0)) } + return + } val batch = mutableListOf() fun flushBatch() { @@ -182,16 +197,21 @@ class NodeVaultService( private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord): List> { fun makeUpdate(tx: WireTransaction): Vault.Update? { + val outputsBitSet = BitSet(tx.outputs.size) val ourNewStates = when (statesToRecord) { StatesToRecord.NONE -> throw AssertionError("Should not reach here") StatesToRecord.ONLY_RELEVANT -> tx.outputs.withIndex().filter { isRelevant(it.value.data, keyManagementService.filterMyKeys(tx.outputs.flatMap { it.data.participants.map { it.owningKey } }).toSet()) } StatesToRecord.ALL_VISIBLE -> tx.outputs.withIndex() - }.map { tx.outRef(it.index) } + }.map { + outputsBitSet[it.index] = true + tx.outRef(it.index) + } + producedStatesMapping.put(tx.id, outputsBitSet) // Retrieve all unconsumed states for this transaction's inputs - val consumedStates = loadStates(tx.inputs) + val consumedStates = loadStatesWithVaultFilter(tx.inputs) // Is transaction irrelevant? if (consumedStates.isEmpty() && ourNewStates.isEmpty()) { @@ -240,19 +260,49 @@ class NodeVaultService( } } - private fun loadStates(refs: Collection): Collection> { - val states = mutableListOf>() + private fun loadStatesWithVaultFilter(refs: Collection): Collection> { + val states = mutableSetOf() if (refs.isNotEmpty()) { - val refsList = refs.toList() - val pageSize = PageSpecification().pageSize - (0..(refsList.size - 1) / pageSize).forEach { - val offset = it * pageSize - val limit = minOf(offset + pageSize, refsList.size) - val page = queryBy(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states - states.addAll(page) + // Eliminate any that we have cached. + val uncachedTx = refs.groupBy { it.txhash }.filter { + val cachedProduced = producedStatesMapping.getIfPresent(it.key) + if (cachedProduced == null) { + true + } else { + // Add to results. + states.addAll(it.value.filter { cachedProduced[it.index] }) + false + } + } + // If we have uncached, go and get those in a single query + if (uncachedTx.isNotEmpty()) { + // Select all rows with matching stateRefs + val criteriaQuery = criteriaBuilder.createTupleQuery() + val root = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) + criteriaQuery.multiselect(root.get(VaultSchemaV1.VaultStates::stateRef.name)) + val txIds = uncachedTx.keys.map { it.toString() } + val compositeKey = root.get(VaultSchemaV1.VaultStates::stateRef.name).get(PersistentStateRef::txId.name) + criteriaQuery.where(compositeKey.`in`(txIds)) + + // prepare query for execution + val session = currentDBSession() + val query = session.createQuery(criteriaQuery) + + // execution. For each transaction: + query.resultList.map { it[0] as PersistentStateRef }.groupBy { it.txId }.forEach { + // Record what states were found, in the cache and the results. + val secureHash = SecureHash.parse(it.key) + val outputsBitSet = BitSet(0) // This is auto-expanded when setting higher bits. + states.addAll(it.value.map { + outputsBitSet[it.index] = true + StateRef(secureHash, it.index) + }) + // Cache the result for future lookups. + producedStatesMapping.put(secureHash, outputsBitSet) + } } } - return states + return servicesForResolution.loadStates(states) } private fun processAndNotify(updates: List>) {