From ed08b2c5de6dd028a9d33cc078da71e971b1bd14 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 25 May 2023 13:01:42 +0100 Subject: [PATCH] ENT-9793: Added Page.previousPageAnchor to allow detection of vault changes whilst pages are loaded --- .../corda/core/node/services/VaultService.kt | 72 ++++-- .../net/corda/node/internal/AbstractNode.kt | 2 +- .../internal/NodeServicesForResolution.kt | 15 ++ .../internal/ServicesForResolutionImpl.kt | 23 +- .../node/migration/VaultStateMigration.kt | 5 +- .../PersistentScheduledFlowRepository.kt | 7 +- .../PersistentUniquenessProvider.kt | 9 +- .../node/services/vault/NodeVaultService.kt | 244 +++++++++++------- .../corda/node/services/vault/VaultSchema.kt | 18 ++ .../bftsmart/BFTSmartNotaryService.kt | 10 +- .../corda/notary/jpa/JPAUniquenessProvider.kt | 11 +- .../persistence/HibernateConfigurationTest.kt | 22 +- .../node/services/vault/VaultQueryTests.kt | 92 +++++-- .../vault/VaultSoftLockManagerTest.kt | 4 +- .../net/corda/testing/node/MockServices.kt | 10 +- 15 files changed, 366 insertions(+), 178 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 5993d60587..1913e6bf84 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -1,10 +1,23 @@ +@file:Suppress("LongParameterList") + package net.corda.core.node.services import co.paralleluniverse.fibers.Suspendable import net.corda.core.DeleteForDJVM import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture -import net.corda.core.contracts.* +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.contracts.Amount +import net.corda.core.contracts.AttachmentConstraint +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.FungibleAsset +import net.corda.core.contracts.FungibleState +import net.corda.core.contracts.HashAttachmentConstraint +import net.corda.core.contracts.ReferencedStateAndRef +import net.corda.core.contracts.SignatureAttachmentConstraint +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowException @@ -13,9 +26,16 @@ import net.corda.core.identity.AbstractParty import net.corda.core.internal.MAX_NUMBER_OF_KEYS_IN_SIGNATURE_CONSTRAINT import net.corda.core.internal.concurrent.doneFuture import net.corda.core.messaging.DataFeed -import net.corda.core.node.services.Vault.RelevancyStatus.* +import net.corda.core.node.StatesToRecord +import net.corda.core.node.services.Vault.RelevancyStatus.ALL +import net.corda.core.node.services.Vault.RelevancyStatus.NOT_RELEVANT +import net.corda.core.node.services.Vault.RelevancyStatus.RELEVANT import net.corda.core.node.services.Vault.StateStatus -import net.corda.core.node.services.vault.* +import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE +import net.corda.core.node.services.vault.MAX_PAGE_SIZE +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable import net.corda.core.toFuture import net.corda.core.transactions.LedgerTransaction @@ -196,16 +216,28 @@ class Vault(val states: Iterable>) { * otherwise it defaults to -1. * 4) Status types used in this query: [StateStatus.UNCONSUMED], [StateStatus.CONSUMED], [StateStatus.ALL]. * 5) Other results as a [List] of any type (eg. aggregate function results with/without group by). + * 6) A [StateRef] pointing to the last state of the previous page. Use this to detect if the database has changed whilst loading pages + * by checking it matches your last loaded state. * - * Note: currently otherResults are used only for Aggregate Functions (in which case, the states and statesMetadata - * results will be empty). + * Note: currently [otherResults] is used only for aggregate functions (in which case, [states] and [statesMetadata] will be empty). */ @CordaSerializable - data class Page(val states: List>, - val statesMetadata: List, - val totalStatesAvailable: Long, - val stateTypes: StateStatus, - val otherResults: List) + data class Page @JvmOverloads constructor( + val states: List>, + val statesMetadata: List, + val totalStatesAvailable: Long, + val stateTypes: StateStatus, + val otherResults: List, + val previousPageAnchor: StateRef? = null + ) { + fun copy(states: List> = this.states, + statesMetadata: List = this.statesMetadata, + totalStatesAvailable: Long = this.totalStatesAvailable, + stateTypes: StateStatus = this.stateTypes, + otherResults: List = this.otherResults): Page { + return Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, null) + } + } @CordaSerializable data class StateMetadata @JvmOverloads constructor( @@ -213,11 +245,11 @@ class Vault(val states: Iterable>) { val contractStateClassName: String, val recordedTime: Instant, val consumedTime: Instant?, - val status: Vault.StateStatus, + val status: StateStatus, val notary: AbstractParty?, val lockId: String?, val lockUpdateTime: Instant?, - val relevancyStatus: Vault.RelevancyStatus? = null, + val relevancyStatus: RelevancyStatus? = null, val constraintInfo: ConstraintInfo? = null ) { fun copy( @@ -225,7 +257,7 @@ class Vault(val states: Iterable>) { contractStateClassName: String = this.contractStateClassName, recordedTime: Instant = this.recordedTime, consumedTime: Instant? = this.consumedTime, - status: Vault.StateStatus = this.status, + status: StateStatus = this.status, notary: AbstractParty? = this.notary, lockId: String? = this.lockId, lockUpdateTime: Instant? = this.lockUpdateTime @@ -237,11 +269,11 @@ class Vault(val states: Iterable>) { contractStateClassName: String = this.contractStateClassName, recordedTime: Instant = this.recordedTime, consumedTime: Instant? = this.consumedTime, - status: Vault.StateStatus = this.status, + status: StateStatus = this.status, notary: AbstractParty? = this.notary, lockId: String? = this.lockId, lockUpdateTime: Instant? = this.lockUpdateTime, - relevancyStatus: Vault.RelevancyStatus? + relevancyStatus: RelevancyStatus? ): StateMetadata { return StateMetadata(ref, contractStateClassName, recordedTime, consumedTime, status, notary, lockId, lockUpdateTime, relevancyStatus, ConstraintInfo(AlwaysAcceptAttachmentConstraint)) } @@ -249,9 +281,9 @@ class Vault(val states: Iterable>) { companion object { @Deprecated("No longer used. The vault does not emit empty updates") - val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet()) + val NoUpdate = Update(emptySet(), emptySet(), type = UpdateType.GENERAL, references = emptySet()) @Deprecated("No longer used. The vault does not emit empty updates") - val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE, references = emptySet()) + val NoNotaryUpdate = Update(emptySet(), emptySet(), type = UpdateType.NOTARY_CHANGE, references = emptySet()) } } @@ -302,7 +334,7 @@ interface VaultService { fun whenConsumed(ref: StateRef): CordaFuture> { val query = QueryCriteria.VaultQueryCriteria( stateRefs = listOf(ref), - status = Vault.StateStatus.CONSUMED + status = StateStatus.CONSUMED ) val result = trackBy(query) val snapshot = result.snapshot.states @@ -358,8 +390,8 @@ interface VaultService { /** * Helper function to determine spendable states and soft locking them. * Currently performance will be worse than for the hand optimised version in - * [Cash.unconsumedCashStatesForSpending]. However, this is fully generic and can operate with custom [FungibleState] - * and [FungibleAsset] states. + * [net.corda.finance.workflows.asset.selection.AbstractCashSelection.unconsumedCashStatesForSpending]. However, this is fully generic + * and can operate with custom [FungibleState] and [FungibleAsset] states. * @param lockId The [FlowLogic.runId]'s [UUID] of the current flow used to soft lock the states. * @param eligibleStatesQuery A custom query object that selects down to the appropriate subset of all states of the * [contractStateType]. e.g. by selecting on account, issuer, etc. The query is internally augmented with the diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index eb781dd3f7..5fbbde1296 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1181,7 +1181,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, networkParameters: NetworkParameters) protected open fun makeVaultService(keyManagementService: KeyManagementService, - services: ServicesForResolution, + services: NodeServicesForResolution, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, cordappLoader.appClassLoader) diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt b/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt new file mode 100644 index 0000000000..5baa528297 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/NodeServicesForResolution.kt @@ -0,0 +1,15 @@ +package net.corda.node.internal + +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionResolutionException +import net.corda.core.node.ServicesForResolution +import java.util.LinkedHashSet + +interface NodeServicesForResolution : ServicesForResolution { + @Throws(TransactionResolutionException::class) + override fun loadStates(stateRefs: Set): Set> = loadStates(stateRefs, LinkedHashSet()) + + fun >> loadStates(input: Iterable, output: C): C +} diff --git a/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt b/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt index f5836c0cc5..ffea11f536 100644 --- a/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/ServicesForResolutionImpl.kt @@ -1,10 +1,17 @@ package net.corda.node.internal -import net.corda.core.contracts.* +import net.corda.core.contracts.Attachment +import net.corda.core.contracts.AttachmentResolutionException +import net.corda.core.contracts.ContractAttachment +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionResolutionException +import net.corda.core.contracts.TransactionState import net.corda.core.cordapp.CordappProvider import net.corda.core.internal.SerializedStateAndRef +import net.corda.core.internal.uncheckedCast import net.corda.core.node.NetworkParameters -import net.corda.core.node.ServicesForResolution import net.corda.core.node.services.AttachmentStorage import net.corda.core.node.services.IdentityService import net.corda.core.node.services.NetworkParametersService @@ -20,7 +27,7 @@ data class ServicesForResolutionImpl( override val cordappProvider: CordappProvider, override val networkParametersService: NetworkParametersService, private val validatedTransactions: TransactionStorage -) : ServicesForResolution { +) : NodeServicesForResolution { override val networkParameters: NetworkParameters get() = networkParametersService.lookup(networkParametersService.currentHash) ?: throw IllegalArgumentException("No current parameters in network parameters storage") @@ -30,13 +37,13 @@ data class ServicesForResolutionImpl( return stx.resolveBaseTransaction(this).outputs[stateRef.index] } - @Throws(TransactionResolutionException::class) - override fun loadStates(stateRefs: Set): Set> { - return stateRefs.groupBy { it.txhash }.flatMap { + override fun >> loadStates(input: Iterable, output: C): C { + input.groupBy { it.txhash }.forEach { val stx = validatedTransactions.getTransaction(it.key) ?: throw TransactionResolutionException(it.key) val baseTx = stx.resolveBaseTransaction(this) - it.value.map { ref -> StateAndRef(baseTx.outputs[ref.index], ref) } - }.toSet() + it.value.mapTo(output) { ref -> StateAndRef(uncheckedCast(baseTx.outputs[ref.index]), ref) } + } + return output } @Throws(TransactionResolutionException::class, AttachmentResolutionException::class) diff --git a/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt b/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt index dad25cf69f..28d8dc3a89 100644 --- a/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt +++ b/node/src/main/kotlin/net/corda/node/migration/VaultStateMigration.kt @@ -2,7 +2,6 @@ package net.corda.node.migration import liquibase.database.Database import net.corda.core.contracts.* -import net.corda.core.crypto.SecureHash import net.corda.core.identity.CordaX500Name import net.corda.core.node.services.Vault import net.corda.core.schemas.MappedSchema @@ -19,6 +18,7 @@ import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSchemaV1 +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.SchemaMigration @@ -62,8 +62,7 @@ class VaultStateMigration : CordaMigration() { private fun getStateAndRef(persistentState: VaultSchemaV1.VaultStates): StateAndRef { val persistentStateRef = persistentState.stateRef ?: throw VaultStateMigrationException("Persistent state ref missing from state") - val txHash = SecureHash.create(persistentStateRef.txId) - val stateRef = StateRef(txHash, persistentStateRef.index) + val stateRef = persistentStateRef.toStateRef() val state = try { servicesForResolution.loadState(stateRef) } catch (e: Exception) { diff --git a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt index 2208eef88f..f62db2eee4 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt @@ -2,8 +2,8 @@ package net.corda.node.services.events import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash import net.corda.core.schemas.PersistentStateRef +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence interface ScheduledFlowRepository { @@ -25,9 +25,8 @@ class PersistentScheduledFlowRepository(val database: CordaPersistence) : Schedu } private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair { - val txId = scheduledStateRecord.output.txId - val index = scheduledStateRecord.output.index - return Pair(StateRef(SecureHash.create(txId), index), ScheduledStateRef(StateRef(SecureHash.create(txId), index), scheduledStateRecord.scheduledAt)) + val stateRef = scheduledStateRecord.output.toStateRef() + return Pair(stateRef, ScheduledStateRef(stateRef, scheduledStateRecord.scheduledAt)) } override fun delete(key: StateRef): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 66ec2007fa..aa69d50db3 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -25,6 +25,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.node.services.vault.toStateRef import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX @@ -157,13 +158,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId - val index = it.id.index - Pair( - StateRef(txhash = SecureHash.create(txId), index = index), - SecureHash.create(it.consumingTxHash) - ) - + Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash)) }, toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> CommittedState( diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 6db962cdce..5109bb0d3e 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -3,28 +3,65 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import net.corda.core.CordaRuntimeException -import net.corda.core.contracts.* +import net.corda.core.contracts.Amount +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.FungibleAsset +import net.corda.core.contracts.FungibleState +import net.corda.core.contracts.Issued +import net.corda.core.contracts.OwnableState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash import net.corda.core.crypto.containsAny import net.corda.core.flows.HospitalizeFlowException -import net.corda.core.internal.* +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.TransactionDeserialisationException +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.tee +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed -import net.corda.core.node.ServicesForResolution import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.* -import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo -import net.corda.core.node.services.vault.* +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.StatesNotAvailableException +import net.corda.core.node.services.Vault +import net.corda.core.node.services.VaultQueryException +import net.corda.core.node.services.VaultService +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.Sort +import net.corda.core.node.services.vault.SortAttribute +import net.corda.core.node.services.vault.builder import net.corda.core.observable.internal.OnResilientSubscribe import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.* -import net.corda.core.utilities.* +import net.corda.core.transactions.ContractUpgradeWireTransaction +import net.corda.core.transactions.CoreTransaction +import net.corda.core.transactions.FullTransaction +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.NotaryChangeWireTransaction +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.toNonEmptySet +import net.corda.core.utilities.trace +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.nodeapi.internal.persistence.* +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull +import net.corda.nodeapi.internal.persistence.currentDBSession +import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import org.hibernate.Session +import org.hibernate.query.Query import rx.Observable import rx.exceptions.OnErrorNotImplementedException import rx.subjects.PublishSubject @@ -32,7 +69,8 @@ import java.security.PublicKey import java.sql.SQLException import java.time.Clock import java.time.Instant -import java.util.* +import java.util.Arrays +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArraySet import javax.persistence.PersistenceException @@ -54,9 +92,9 @@ import javax.persistence.criteria.Root class NodeVaultService( private val clock: Clock, private val keyManagementService: KeyManagementService, - private val servicesForResolution: ServicesForResolution, + private val servicesForResolution: NodeServicesForResolution, private val database: CordaPersistence, - private val schemaService: SchemaService, + schemaService: SchemaService, private val appClassloader: ClassLoader ) : SingletonSerializeAsToken(), VaultServiceInternal { companion object { @@ -196,7 +234,7 @@ class NodeVaultService( if (lockId != null) { lockId = null lockUpdateTime = clock.instant() - log.trace("Releasing soft lock on consumed state: $stateRef") + log.trace { "Releasing soft lock on consumed state: $stateRef" } } session.save(state) } @@ -227,7 +265,7 @@ class NodeVaultService( } // we are not inside a flow, we are most likely inside a CordaService; // we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates. - return _rawUpdatesPublisher.resilientOnError() + _rawUpdatesPublisher.resilientOnError() } override val updates: Observable> @@ -639,7 +677,20 @@ class NodeVaultService( @Throws(VaultQueryException::class) override fun _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class): Vault.Page { try { - return _queryBy(criteria, paging, sorting, contractStateType, false) + // We decrement by one if the client requests MAX_VALUE, assuming they can not notice this because they don't have enough memory + // to request MAX_VALUE states at once. + val validPaging = if (paging.pageSize == Integer.MAX_VALUE) { + paging.copy(pageSize = Integer.MAX_VALUE - 1) + } else { + checkVaultQuery(paging.pageSize >= 1) { "Page specification: invalid page size ${paging.pageSize} [minimum is 1]" } + paging + } + if (!validPaging.isDefault) { + checkVaultQuery(validPaging.pageNumber >= DEFAULT_PAGE_NUM) { + "Page specification: invalid page number ${validPaging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]" + } + } + return queryBy(criteria, validPaging, sorting, contractStateType) } catch (e: VaultQueryException) { throw e } catch (e: Exception) { @@ -647,100 +698,100 @@ class NodeVaultService( } } - @Throws(VaultQueryException::class) - private fun _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class, skipPagingChecks: Boolean): Vault.Page { - // We decrement by one if the client requests MAX_PAGE_SIZE, assuming they can not notice this because they don't have enough memory - // to request `MAX_PAGE_SIZE` states at once. - val paging = if (paging_.pageSize == Integer.MAX_VALUE) { - paging_.copy(pageSize = Integer.MAX_VALUE - 1) - } else { - paging_ - } + private fun queryBy(criteria: QueryCriteria, + paging: PageSpecification, + sorting: Sort, + contractStateType: Class): Vault.Page { log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting" } return database.transaction { // calculate total results where a page specification has been defined - var totalStates = -1L - if (!skipPagingChecks && !paging.isDefault) { - val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() } - val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL) - val results = _queryBy(criteria.and(countCriteria), PageSpecification(), Sort(emptyList()), contractStateType, true) // only skip pagination checks for total results count query - totalStates = results.otherResults.last() as Long - } + val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType) - val session = getSession() - - val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java) - val queryRootVaultStates = criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) - - // TODO: revisit (use single instance of parser for all queries) - val criteriaParser = HibernateQueryCriteriaParser(contractStateType, contractStateTypeMappings, criteriaBuilder, criteriaQuery, queryRootVaultStates) - - // parse criteria and build where predicates - criteriaParser.parse(criteria, sorting) - - // prepare query for execution - val query = session.createQuery(criteriaQuery) - - // pagination checks - if (!skipPagingChecks && !paging.isDefault) { - // pagination - if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]") - if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [minimum is 1]") - if (paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum is $MAX_PAGE_SIZE]") - } - - // For both SQLServer and PostgresSQL, firstResult must be >= 0. So we set a floor at 0. - // TODO: This is a catch-all solution. But why is the default pageNumber set to be -1 in the first place? - // Even if we set the default pageNumber to be 1 instead, that may not cover the non-default cases. - // So the floor may be necessary anyway. - query.firstResult = maxOf(0, (paging.pageNumber - 1) * paging.pageSize) - val pageSize = paging.pageSize + 1 - query.maxResults = if (pageSize > 0) pageSize else Integer.MAX_VALUE // detection too many results, protected against overflow + val (query, stateTypes) = createQuery(criteria, contractStateType, sorting) + query.setResultWindow(paging) // execution val results = query.resultList // final pagination check (fail-fast on too many results when no pagination specified) - if (!skipPagingChecks && paging.isDefault && results.size > DEFAULT_PAGE_SIZE) { - throw VaultQueryException("There are ${results.size} results, which exceeds the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. In order to retrieve these results, provide a `PageSpecification(pageNumber, pageSize)` to the method invoked.") + checkVaultQuery(!paging.isDefault || results.size != paging.pageSize + 1) { + "There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " + + "In order to retrieve these results, provide a PageSpecification to the method invoked." } - val statesAndRefs: MutableList> = mutableListOf() - val statesMeta: MutableList = mutableListOf() + + val resultsIterator = results.iterator() + + // From page 2 and onwards, the first result is the previous page anchor + val previousPageAnchor = if (paging.pageNumber > DEFAULT_PAGE_NUM && resultsIterator.hasNext()) { + val previousVaultState = resultsIterator.next()[0] as VaultSchemaV1.VaultStates + previousVaultState.stateRef!!.toStateRef() + } else { + null + } + + val statesMetadata: MutableList = mutableListOf() val otherResults: MutableList = mutableListOf() - val stateRefs = mutableSetOf() - results.asSequence() - .forEachIndexed { index, result -> - if (result[0] is VaultSchemaV1.VaultStates) { - if (!paging.isDefault && index == paging.pageSize) // skip last result if paged - return@forEachIndexed - val vaultState = result[0] as VaultSchemaV1.VaultStates - val stateRef = StateRef(SecureHash.create(vaultState.stateRef!!.txId), vaultState.stateRef!!.index) - stateRefs.add(stateRef) - statesMeta.add(Vault.StateMetadata(stateRef, - vaultState.contractStateClassName, - vaultState.recordedTime, - vaultState.consumedTime, - vaultState.stateStatus, - vaultState.notary, - vaultState.lockId, - vaultState.lockUpdateTime, - vaultState.relevancyStatus, - constraintInfo(vaultState.constraintType, vaultState.constraintData) - )) - } else { - // TODO: improve typing of returned other results - log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" } - otherResults.addAll(result.toArray().asList()) - } - } - if (stateRefs.isNotEmpty()) - statesAndRefs.addAll(uncheckedCast(servicesForResolution.loadStates(stateRefs))) + for (result in resultsIterator) { + val result0 = result[0] + if (result0 is VaultSchemaV1.VaultStates) { + statesMetadata.add(result0.toStateMetadata()) + } else { + log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" } + otherResults.addAll(result.toArray().asList()) + } + } - Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults) + val states: List> = servicesForResolution.loadStates( + statesMetadata.mapTo(LinkedHashSet()) { it.ref }, + ArrayList() + ) + + Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, previousPageAnchor) } } + private fun Query<*>.setResultWindow(paging: PageSpecification) { + // For both SQLServer and PostgresSQL, firstResult must be >= 0. + firstResult = 0 + if (paging.isDefault) { + // Peek ahead and see if there are more results in case pagination should be done + maxResults = paging.pageSize + 1 + } else if (paging.pageNumber == DEFAULT_PAGE_NUM) { + maxResults = paging.pageSize + } else { + // In addition to aligning the query to the correct result window for the page, also include the previous page's last + // result for the previousPageAnchor value. + firstResult = ((paging.pageNumber - 1) * paging.pageSize) - 1 + maxResults = paging.pageSize + 1 + } + } + + private fun queryTotalStateCount(baseCriteria: QueryCriteria, contractStateType: Class): Long { + val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() } + val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL) + val criteria = baseCriteria.and(countCriteria) + val (query) = createQuery(criteria, contractStateType, null) + val results = query.resultList + return results.last().toArray().last() as Long + } + + private fun createQuery(criteria: QueryCriteria, + contractStateType: Class, + sorting: Sort?): Pair, Vault.StateStatus> { + val criteriaQuery = criteriaBuilder.createQuery(Tuple::class.java) + val criteriaParser = HibernateQueryCriteriaParser( + contractStateType, + contractStateTypeMappings, + criteriaBuilder, + criteriaQuery, + criteriaQuery.from(VaultSchemaV1.VaultStates::class.java) + ) + criteriaParser.parse(criteria, sorting) + val query = getSession().createQuery(criteriaQuery) + return Pair(query, criteriaParser.stateTypes) + } + /** * Returns a [DataFeed] containing the results of the provided query, along with the associated observable, containing any subsequent updates. * @@ -775,6 +826,12 @@ class NodeVaultService( } } + private inline fun checkVaultQuery(value: Boolean, lazyMessage: () -> Any) { + if (!value) { + throw VaultQueryException(lazyMessage().toString()) + } + } + private fun filterContractStates(update: Vault.Update, contractStateType: Class) = update.copy(consumed = filterByContractState(contractStateType, update.consumed), produced = filterByContractState(contractStateType, update.produced)) @@ -802,6 +859,7 @@ class NodeVaultService( } private fun getSession() = database.currentOrNew().session + /** * Derive list from existing vault states and then incrementally update using vault observables */ diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index 06844d40d0..09c71fe1f7 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -2,7 +2,9 @@ package net.corda.node.services.vault import net.corda.core.contracts.ContractState import net.corda.core.contracts.MAX_ISSUER_REF_SIZE +import net.corda.core.contracts.StateRef import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.toStringShort import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party @@ -192,3 +194,19 @@ object VaultSchemaV1 : MappedSchema( ) : IndirectStatePersistable } +fun PersistentStateRef.toStateRef(): StateRef = StateRef(SecureHash.create(txId), index) + +fun VaultSchemaV1.VaultStates.toStateMetadata(): Vault.StateMetadata { + return Vault.StateMetadata( + stateRef!!.toStateRef(), + contractStateClassName, + recordedTime, + consumedTime, + stateStatus, + notary, + lockId, + lockUpdateTime, + relevancyStatus, + Vault.ConstraintInfo.constraintInfo(constraintType, constraintData) + ) +} diff --git a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt index a570ccd7b5..76094c2a1d 100644 --- a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt +++ b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmartNotaryService.kt @@ -21,6 +21,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.services.vault.toStateRef import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import java.security.PublicKey @@ -41,6 +42,8 @@ class BFTSmartNotaryService( ) : NotaryService() { companion object { private val log = contextLogger() + + @Suppress("unused") // Used by NotaryLoader via reflection @JvmStatic val serializationFilter get() = { clazz: Class<*> -> @@ -147,12 +150,7 @@ class BFTSmartNotaryService( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId - val index = it.id.index - Pair( - StateRef(txhash = SecureHash.create(txId), index = index), - SecureHash.create(it.consumingTxHash) - ) + Pair(it.id.toStateRef(), SecureHash.create(it.consumingTxHash)) }, toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> CommittedState( diff --git a/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt index d38a3f35b7..b678478da6 100644 --- a/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -24,6 +24,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.node.services.vault.toStateRef import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.notary.common.InternalResult @@ -142,10 +143,6 @@ class JPAUniquenessProvider( fun encodeStateRef(s: StateRef): PersistentStateRef { return PersistentStateRef(s.txhash.toString(), s.index) } - - fun decodeStateRef(s: PersistentStateRef): StateRef { - return StateRef(txhash = SecureHash.create(s.txId), index = s.index) - } } /** @@ -215,15 +212,15 @@ class JPAUniquenessProvider( committedStates.addAll(existing) } - return committedStates.map { - val stateRef = StateRef(txhash = SecureHash.create(it.id.txId), index = it.id.index) + return committedStates.associate { + val stateRef = it.id.toStateRef() val consumingTxId = SecureHash.create(it.consumingTxHash) if (stateRef in references) { stateRef to StateConsumptionDetails(consumingTxId.reHash(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) } else { stateRef to StateConsumptionDetails(consumingTxId.reHash()) } - }.toMap() + } } private fun withRetry(block: () -> T): T { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt index 1efb349ca0..30cdbe7f59 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt @@ -28,12 +28,14 @@ import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.test.SampleCashSchemaV1 import net.corda.finance.test.SampleCashSchemaV2 import net.corda.finance.test.SampleCashSchemaV3 +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.schema.ContractStateAndRef import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSchemaV1 +import net.corda.node.services.vault.toStateRef import net.corda.node.testing.DummyFungibleContract import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -48,7 +50,6 @@ import net.corda.testing.internal.vault.VaultFiller import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.assertj.core.api.Assertions -import org.assertj.core.api.Assertions.`in` import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.hibernate.SessionFactory @@ -122,7 +123,14 @@ class HibernateConfigurationTest { services = object : MockServices(cordappPackages, BOB_NAME, mock().also { doReturn(null).whenever(it).verifyAndRegisterIdentity(argThat { name == BOB_NAME }) }, generateKeyPair(), dummyNotary.keyPair) { - override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, cordappClassloader).apply { start() } + override val vaultService = NodeVaultService( + Clock.systemUTC(), + keyManagementService, + servicesForResolution as NodeServicesForResolution, + database, + schemaService, + cordappClassloader + ).apply { start() } override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { for (stx in txs) { (validatedTransactions as WritableTransactionStorage).addTransaction(stx) @@ -183,7 +191,7 @@ class HibernateConfigurationTest { // execute query val queryResults = entityManager.createQuery(criteriaQuery).resultList val coins = queryResults.map { - services.loadState(toStateRef(it.stateRef!!)).data + services.loadState(it.stateRef!!.toStateRef()).data }.sumCash() assertThat(coins.toDecimal() >= BigDecimal("50.00")) } @@ -739,7 +747,7 @@ class HibernateConfigurationTest { val queryResults = entityManager.createQuery(criteriaQuery).resultList queryResults.forEach { - val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State + val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State println("${it.stateRef} with owner: ${cashState.owner.owningKey.toBase58String()}") } @@ -823,7 +831,7 @@ class HibernateConfigurationTest { // execute query val queryResults = entityManager.createQuery(criteriaQuery).resultList queryResults.forEach { - val cashState = services.loadState(toStateRef(it.stateRef!!)).data as Cash.State + val cashState = services.loadState(it.stateRef!!.toStateRef()).data as Cash.State println("${it.stateRef} with owner ${cashState.owner.owningKey.toBase58String()} and participants ${cashState.participants.map { it.owningKey.toBase58String() }}") } @@ -961,10 +969,6 @@ class HibernateConfigurationTest { } } - private fun toStateRef(pStateRef: PersistentStateRef): StateRef { - return StateRef(SecureHash.create(pStateRef.txId), pStateRef.index) - } - @Test(timeout=300_000) fun `schema change`() { fun createNewDB(schemas: Set, initialiseSchema: Boolean = true): CordaPersistence { diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 09964b6602..6344331c28 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -20,7 +20,6 @@ import net.corda.finance.* import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.Commodity import net.corda.finance.contracts.DealState -import net.corda.finance.workflows.asset.selection.AbstractCashSelection import net.corda.finance.contracts.asset.Cash import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1.PersistentCashState @@ -28,6 +27,7 @@ import net.corda.finance.schemas.CommercialPaperSchemaV1 import net.corda.finance.test.SampleCashSchemaV2 import net.corda.finance.test.SampleCashSchemaV3 import net.corda.finance.workflows.CommercialPaperUtils +import net.corda.finance.workflows.asset.selection.AbstractCashSelection import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseTransaction @@ -1669,22 +1669,28 @@ abstract class VaultQueryTestsBase : VaultQueryParties { // DOCEND VaultQueryExample7 assertThat(results.states).hasSize(10) assertThat(results.totalStatesAvailable).isEqualTo(100) + assertThat(results.previousPageAnchor).isNull() } } // pagination: last page @Test(timeout=300_000) - fun `all states with paging specification - last`() { + fun `all states with paging specification - last`() { database.transaction { vaultFiller.fillWithSomeTestCash(95.DOLLARS, notaryServices, 95, DUMMY_CASH_ISSUER) + val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL) + // Last page implies we need to perform a row count for the Query first, // and then re-query for a given offset defined by (count - pageSize) - val pagingSpec = PageSpecification(10, 10) + val lastPage = PageSpecification(10, 10) + val lastPageResults = vaultService.queryBy(criteria, paging = lastPage) + assertThat(lastPageResults.states).hasSize(5) // should retrieve states 90..94 + assertThat(lastPageResults.totalStatesAvailable).isEqualTo(95) - val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL) - val results = vaultService.queryBy(criteria, paging = pagingSpec) - assertThat(results.states).hasSize(5) // should retrieve states 90..94 - assertThat(results.totalStatesAvailable).isEqualTo(95) + // Make sure the previousPageAnchor points to the previous page's last result + val penultimatePage = lastPage.copy(pageNumber = lastPage.pageNumber - 1) + val penultimatePageResults = vaultService.queryBy(criteria, paging = penultimatePage) + assertThat(lastPageResults.previousPageAnchor).isEqualTo(penultimatePageResults.statesMetadata.last().ref) } } @@ -1723,7 +1729,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties { @Test(timeout=300_000) fun `pagination not specified but more than default results available`() { expectedEx.expect(VaultQueryException::class.java) - expectedEx.expectMessage("provide a `PageSpecification(pageNumber, pageSize)`") + expectedEx.expectMessage("provide a PageSpecification") database.transaction { vaultFiller.fillWithSomeTestCash(201.DOLLARS, notaryServices, 201, DUMMY_CASH_ISSUER) @@ -1735,16 +1741,23 @@ abstract class VaultQueryTestsBase : VaultQueryParties { // example of querying states with paging using totalStatesAvailable private fun queryStatesWithPaging(vaultService: VaultService, pageSize: Int): List> { // DOCSTART VaultQueryExample24 - var pageNumber = DEFAULT_PAGE_NUM - val states = mutableListOf>() - do { - val pageSpec = PageSpecification(pageNumber = pageNumber, pageSize = pageSize) - val results = vaultService.queryBy(VaultQueryCriteria(), pageSpec) - states.addAll(results.states) - pageNumber++ - } while ((pageSpec.pageSize * (pageNumber - 1)) <= results.totalStatesAvailable) + retry@ + while (true) { + var pageNumber = DEFAULT_PAGE_NUM + val states = mutableListOf>() + do { + val pageSpec = PageSpecification(pageNumber = pageNumber, pageSize = pageSize) + val results = vaultService.queryBy(VaultQueryCriteria(), pageSpec) + if (results.previousPageAnchor != states.lastOrNull()?.ref) { + // Start querying from the 1st page again if we find the vault has changed from underneath us. + continue@retry + } + states.addAll(results.states) + pageNumber++ + } while ((pageSpec.pageSize * (pageNumber - 1)) <= results.totalStatesAvailable) + return states + } // DOCEND VaultQueryExample24 - return states.toList() } // test paging query example works @@ -1760,6 +1773,51 @@ abstract class VaultQueryTestsBase : VaultQueryParties { } } + @Test(timeout = 300_000) + fun `detecting changes to the database whilst pages are loaded`() { + val criteria = VaultQueryCriteria() + val sorting = Sort(setOf(Sort.SortColumn(SortAttribute.Standard(Sort.LinearStateAttribute.EXTERNAL_ID)))) + + fun loadPagesAndCheckAnchors(): List> { + val pages = (1..3).map { vaultService.queryBy(criteria, PageSpecification(it, 10), sorting) } + assertThat(pages[0].previousPageAnchor).isNull() + assertThat(pages[1].previousPageAnchor).isEqualTo(pages[0].states.last().ref) + assertThat(pages[2].previousPageAnchor).isEqualTo(pages[1].states.last().ref) + return pages + } + + database.transaction { + vaultFiller.fillWithSomeTestDeals(dealIds = (10..30).map(Int::toString)) + val pagesV1 = loadPagesAndCheckAnchors() + + vaultFiller.fillWithSomeTestDeals(dealIds = listOf("25.5")) // Insert a state into the middle of the second page + val pagesV2 = loadPagesAndCheckAnchors() + + assertThat(pagesV2[2].previousPageAnchor) + .isNotEqualTo(pagesV1[2].previousPageAnchor) // The previously loaded page is no longer in-sync + .isEqualTo(pagesV1[1].states.let { it[it.lastIndex - 1].ref }) // The anchor now points to the second to last entry + assertThat(pagesV2[1].previousPageAnchor).isEqualTo(pagesV1[1].previousPageAnchor) // The first page is unaffected + + vaultFiller.consumeDeals(pagesV1[0].states.take(1)) // Consume the first state + val pagesV3 = loadPagesAndCheckAnchors() + + assertThat(pagesV3[1].previousPageAnchor) + .isNotEqualTo(pagesV1[1].previousPageAnchor) // Now the first page is no longer in-sync + .isEqualTo(pagesV1[1].states[0].ref) // The top of the second page has now moved into the first page + + vaultFiller.consumeDeals(pagesV3[2].states) // Consume the entire third page + val pagesV4 = loadPagesAndCheckAnchors() + // There are now no states for the third page, but it will still have an anchor + assertThat(pagesV4[2].states).isEmpty() + + vaultFiller.consumeDeals(pagesV3[1].states.takeLast(1)) // Consume the third page anchor + + val thirdPageV5 = vaultService.queryBy(criteria, PageSpecification(3, 10), sorting) + assertThat(thirdPageV5.states).isEmpty() + assertThat(thirdPageV5.previousPageAnchor).isNull() + } + } + // test paging with aggregate function and group by clause @Test(timeout=300_000) fun `test paging with aggregate function and group by clause`() { diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index 7e771e9904..ac621c9bff 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -10,7 +10,6 @@ import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.AbstractParty import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.uncheckedCast -import net.corda.core.node.ServicesForResolution import net.corda.core.node.services.KeyManagementService import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition @@ -29,6 +28,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.singleIdentity import net.corda.testing.flows.registerCoreFlowFactory import net.corda.coretesting.internal.rigorousMock +import net.corda.node.internal.NodeServicesForResolution import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.startFlow @@ -86,7 +86,7 @@ class VaultSoftLockManagerTest { private val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(enclosedCordapp()), defaultFactory = { args -> object : InternalMockNetwork.MockNode(args) { override fun makeVaultService(keyManagementService: KeyManagementService, - services: ServicesForResolution, + services: NodeServicesForResolution, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { val node = this diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index f8a369ca0d..7c19eb0265 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -44,6 +44,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.coretesting.internal.DEV_ROOT_CA import net.corda.node.VersionInfo import net.corda.node.internal.ServicesForResolutionImpl +import net.corda.node.internal.NodeServicesForResolution import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.services.api.SchemaService import net.corda.node.services.api.ServiceHubInternal @@ -498,7 +499,14 @@ open class MockServices private constructor( get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersService, validatedTransactions) internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence, cordappLoader: CordappLoader): VaultServiceInternal { - return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, cordappLoader.appClassLoader).apply { start() } + return NodeVaultService( + clock, + keyManagementService, + servicesForResolution as NodeServicesForResolution, + database, + schemaService, + cordappLoader.appClassLoader + ).apply { start() } } // This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API