Merge pull request #7384 from corda/shams-vault-page-stream

ENT-9793: Use streams when loading vault query pages
This commit is contained in:
Adel El-Beik 2023-06-06 15:30:51 +01:00 committed by GitHub
commit 2cdd931fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -73,6 +73,7 @@ import java.util.Arrays
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
import java.util.stream.Stream
import javax.persistence.PersistenceException
import javax.persistence.Tuple
import javax.persistence.criteria.CriteriaBuilder
@ -690,7 +691,10 @@ class NodeVaultService(
"Page specification: invalid page number ${validPaging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]"
}
}
return queryBy(criteria, validPaging, sorting, contractStateType)
log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $validPaging, sorting: $sorting" }
return database.transaction {
queryBy(criteria, validPaging, sorting, contractStateType)
}
} catch (e: VaultQueryException) {
throw e
} catch (e: Exception) {
@ -702,36 +706,25 @@ class NodeVaultService(
paging: PageSpecification,
sorting: Sort,
contractStateType: Class<out T>): Vault.Page<T> {
log.debug { "Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting" }
return database.transaction {
// calculate total results where a page specification has been defined
val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType)
// calculate total results where a page specification has been defined
val totalStatesAvailable = if (paging.isDefault) -1 else queryTotalStateCount(criteria, contractStateType)
val (query, stateTypes) = createQuery(criteria, contractStateType, sorting)
query.setResultWindow(paging)
val (query, stateTypes) = createQuery(criteria, contractStateType, sorting)
query.setResultWindow(paging)
// execution
val results = query.resultList
// final pagination check (fail-fast on too many results when no pagination specified)
checkVaultQuery(!paging.isDefault || results.size != paging.pageSize + 1) {
"There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " +
"In order to retrieve these results, provide a PageSpecification to the method invoked."
}
var previousPageAnchor: StateRef? = null
val statesMetadata: MutableList<Vault.StateMetadata> = mutableListOf()
val otherResults: MutableList<Any> = mutableListOf()
query.resultStream(paging).use { results ->
val resultsIterator = results.iterator()
// From page 2 and onwards, the first result is the previous page anchor
val previousPageAnchor = if (paging.pageNumber > DEFAULT_PAGE_NUM && resultsIterator.hasNext()) {
if (paging.pageNumber > DEFAULT_PAGE_NUM && resultsIterator.hasNext()) {
val previousVaultState = resultsIterator.next()[0] as VaultSchemaV1.VaultStates
previousVaultState.stateRef!!.toStateRef()
} else {
null
previousPageAnchor = previousVaultState.stateRef!!.toStateRef()
}
val statesMetadata: MutableList<Vault.StateMetadata> = mutableListOf()
val otherResults: MutableList<Any> = mutableListOf()
for (result in resultsIterator) {
val result0 = result[0]
if (result0 is VaultSchemaV1.VaultStates) {
@ -741,13 +734,27 @@ class NodeVaultService(
otherResults.addAll(result.toArray().asList())
}
}
}
val states: List<StateAndRef<T>> = servicesForResolution.loadStates(
statesMetadata.mapTo(LinkedHashSet()) { it.ref },
ArrayList()
)
val states: List<StateAndRef<T>> = servicesForResolution.loadStates(
statesMetadata.mapTo(LinkedHashSet()) { it.ref },
ArrayList()
)
Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, previousPageAnchor)
return Vault.Page(states, statesMetadata, totalStatesAvailable, stateTypes, otherResults, previousPageAnchor)
}
private fun <R> Query<R>.resultStream(paging: PageSpecification): Stream<R> {
return if (paging.isDefault) {
val allResults = resultList
// final pagination check (fail-fast on too many results when no pagination specified)
checkVaultQuery(allResults.size != paging.pageSize + 1) {
"There are more results than the limit of $DEFAULT_PAGE_SIZE for queries that do not specify paging. " +
"In order to retrieve these results, provide a PageSpecification to the method invoked."
}
allResults.stream()
} else {
stream()
}
}