From ff0693a5984e83763baa68a498e80c4317bd1435 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 6 Jun 2023 11:25:14 +0100 Subject: [PATCH] ENT-9793: Use streams when loading vault query pages --- .../node/services/vault/NodeVaultService.kt | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 5109bb0d3e..ec4984ea68 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -73,6 +73,7 @@ import java.util.Arrays import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArraySet +import java.util.stream.Stream import javax.persistence.PersistenceException import javax.persistence.Tuple import javax.persistence.criteria.CriteriaBuilder @@ -690,7 +691,10 @@ class NodeVaultService( "Page specification: invalid page number ${validPaging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]" } } - return queryBy(criteria, validPaging, sorting, contractStateType) + log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $validPaging, sorting: $sorting" } + return database.transaction { + queryBy(criteria, validPaging, sorting, contractStateType) + } } catch (e: VaultQueryException) { throw e } catch (e: Exception) { @@ -702,36 +706,25 @@ class NodeVaultService( paging: PageSpecification, sorting: Sort, contractStateType: Class): Vault.Page { - log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting" } - return database.transaction { - // calculate total results where a page specification has been defined - val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType) + // calculate total results where a page specification has been defined + val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType) - val (query, stateTypes) = createQuery(criteria, contractStateType, sorting) - query.setResultWindow(paging) + val (query, stateTypes) = createQuery(criteria, contractStateType, sorting) + query.setResultWindow(paging) - // execution - val results = query.resultList - - // final pagination check (fail-fast on too many results when no pagination specified) - checkVaultQuery(!paging.isDefault || results.size != paging.pageSize + 1) { - "There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " + - "In order to retrieve these results, provide a PageSpecification to the method invoked." - } + var previousPageAnchor: StateRef? = null + val statesMetadata: MutableList = mutableListOf() + val otherResults: MutableList = mutableListOf() + query.resultStream(paging).use { results -> val resultsIterator = results.iterator() // From page 2 and onwards, the first result is the previous page anchor - val previousPageAnchor = if (paging.pageNumber > DEFAULT_PAGE_NUM && resultsIterator.hasNext()) { + if (paging.pageNumber > DEFAULT_PAGE_NUM && resultsIterator.hasNext()) { val previousVaultState = resultsIterator.next()[0] as VaultSchemaV1.VaultStates - previousVaultState.stateRef!!.toStateRef() - } else { - null + previousPageAnchor = previousVaultState.stateRef!!.toStateRef() } - val statesMetadata: MutableList = mutableListOf() - val otherResults: MutableList = mutableListOf() - for (result in resultsIterator) { val result0 = result[0] if (result0 is VaultSchemaV1.VaultStates) { @@ -741,13 +734,27 @@ class NodeVaultService( otherResults.addAll(result.toArray().asList()) } } + } - val states: List> = servicesForResolution.loadStates( - statesMetadata.mapTo(LinkedHashSet()) { it.ref }, - ArrayList() - ) + val states: List> = servicesForResolution.loadStates( + statesMetadata.mapTo(LinkedHashSet()) { it.ref }, + ArrayList() + ) - Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, previousPageAnchor) + return Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, previousPageAnchor) + } + + private fun Query.resultStream(paging: PageSpecification): Stream { + return if (paging.isDefault) { + val allResults = resultList + // final pagination check (fail-fast on too many results when no pagination specified) + checkVaultQuery(allResults.size != paging.pageSize + 1) { + "There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " + + "In order to retrieve these results, provide a PageSpecification to the method invoked." + } + allResults.stream() + } else { + stream() } }