Vault Query Pagination simplification (#997)

* Pagination improvements (fail-fast on too many results without pagination specification)
* Fix incorrectly returned results count.
* Performance optimisation: only return totalStatesAvailable count on Pagination specification.
* Changed DEFAULT_PAGE_NUMBER to 1 (eg. page numbering starts from 1)
* Changed MAX_PAGE_SIZE to Int.MAX_VALUE
* Fixed compiler WARNINGs in Unit tests.
* Fixed minimum page size check (1).
* Updated API-RST docs with behavioural notes.
* Updated documentation (RST and API);
This commit is contained in:
josecoll
2017-07-12 09:53:15 +01:00
committed by GitHub
parent ac07b3fe94
commit 5f7b8f6ec3
9 changed files with 182 additions and 147 deletions

View File

@ -95,6 +95,7 @@ class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
}
is ColumnPredicate.BinaryComparison -> {
column as Path<Comparable<Any?>?>
@Suppress("UNCHECKED_CAST")
val literal = columnPredicate.rightLiteral as Comparable<Any?>?
when (columnPredicate.operator) {
BinaryComparisonOperator.GREATER_THAN -> criteriaBuilder.greaterThan(column, literal)
@ -117,8 +118,11 @@ class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
}
}
is ColumnPredicate.Between -> {
@Suppress("UNCHECKED_CAST")
column as Path<Comparable<Any?>?>
@Suppress("UNCHECKED_CAST")
val fromLiteral = columnPredicate.rightFromLiteral as Comparable<Any?>?
@Suppress("UNCHECKED_CAST")
val toLiteral = columnPredicate.rightToLiteral as Comparable<Any?>?
criteriaBuilder.between(column, fromLiteral, toLiteral)
}
@ -164,6 +168,7 @@ class HibernateQueryCriteriaParser(val contractType: Class<out ContractState>,
val columnPredicate = expression.predicate
when (columnPredicate) {
is ColumnPredicate.AggregateFunction -> {
@Suppress("UNCHECKED_CAST")
column as Path<Long?>?
val aggregateExpression =
when (columnPredicate.type) {

View File

@ -11,10 +11,8 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultQueryException
import net.corda.core.node.services.VaultQueryService
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.storageKryo
@ -43,6 +41,15 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): Vault.Page<T> {
log.info("Vault Query for contract type: $contractType, criteria: $criteria, pagination: $paging, sorting: $sorting")
// calculate total results where a page specification has been defined
var totalStates = -1L
if (!paging.isDefault) {
val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() }
val countCriteria = VaultCustomQueryCriteria(count)
val results = queryBy(contractType, criteria.and(countCriteria))
totalStates = results.otherResults[0] as Long
}
val session = sessionFactory.withOptions().
connection(TransactionManager.current().connection).
openSession()
@ -62,43 +69,45 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
// prepare query for execution
val query = session.createQuery(criteriaQuery)
// pagination
if (paging.pageNumber < 0) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from 0]")
if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]")
// pagination checks
if (!paging.isDefault) {
// pagination
if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]")
if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [must be a value between 1 and $MAX_PAGE_SIZE]")
}
// count total results available
val countQuery = criteriaBuilder.createQuery(Long::class.java)
countQuery.select(criteriaBuilder.count(countQuery.from(VaultSchemaV1.VaultStates::class.java)))
val totalStates = session.createQuery(countQuery).singleResult.toInt()
if ((paging.pageNumber != 0) && (paging.pageSize * paging.pageNumber >= totalStates))
throw VaultQueryException("Requested more results than available [${paging.pageSize} * ${paging.pageNumber} >= ${totalStates}]")
query.firstResult = paging.pageNumber * paging.pageSize
query.maxResults = paging.pageSize
query.firstResult = (paging.pageNumber - 1) * paging.pageSize
query.maxResults = paging.pageSize + 1 // detection too many results
// execution
val results = query.resultList
val statesAndRefs: MutableList<StateAndRef<*>> = mutableListOf()
// final pagination check (fail-fast on too many results when no pagination specified)
if (paging.isDefault && results.size > DEFAULT_PAGE_SIZE)
throw VaultQueryException("Please specify a `PageSpecification` as there are more results [${results.size}] than the default page size [$DEFAULT_PAGE_SIZE]")
val statesAndRefs: MutableList<StateAndRef<T>> = mutableListOf()
val statesMeta: MutableList<Vault.StateMetadata> = mutableListOf()
val otherResults: MutableList<Any> = mutableListOf()
results.asSequence()
.forEach { it ->
if (it[0] is VaultSchemaV1.VaultStates) {
val it = it[0] as VaultSchemaV1.VaultStates
val stateRef = StateRef(SecureHash.parse(it.stateRef!!.txId!!), it.stateRef!!.index!!)
val state = it.contractState.deserialize<TransactionState<T>>(storageKryo())
statesMeta.add(Vault.StateMetadata(stateRef, it.contractStateClassName, it.recordedTime, it.consumedTime, it.stateStatus, it.notaryName, it.notaryKey, it.lockId, it.lockUpdateTime))
.forEachIndexed { index, result ->
if (result[0] is VaultSchemaV1.VaultStates) {
if (!paging.isDefault && index == paging.pageSize) // skip last result if paged
return@forEachIndexed
val vaultState = result[0] as VaultSchemaV1.VaultStates
val stateRef = StateRef(SecureHash.parse(vaultState.stateRef!!.txId!!), vaultState.stateRef!!.index!!)
val state = vaultState.contractState.deserialize<TransactionState<T>>(storageKryo())
statesMeta.add(Vault.StateMetadata(stateRef, vaultState.contractStateClassName, vaultState.recordedTime, vaultState.consumedTime, vaultState.stateStatus, vaultState.notaryName, vaultState.notaryKey, vaultState.lockId, vaultState.lockUpdateTime))
statesAndRefs.add(StateAndRef(state, stateRef))
}
else {
log.debug { "OtherResults: ${Arrays.toString(it.toArray())}" }
otherResults.addAll(it.toArray().asList())
log.debug { "OtherResults: ${Arrays.toString(result.toArray())}" }
otherResults.addAll(result.toArray().asList())
}
}
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, pageable = paging, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults) as Vault.Page<T>
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
} catch (e: Exception) {
log.error(e.message)
@ -132,6 +141,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableList<String>>()
distinctTypes.forEach { it ->
@Suppress("UNCHECKED_CAST")
val concreteType = Class.forName(it) as Class<ContractState>
val contractInterfaces = deriveContractInterfaces(concreteType)
contractInterfaces.map {
@ -146,6 +156,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
val myInterfaces: MutableSet<Class<T>> = mutableSetOf()
clazz.interfaces.forEach {
if (!it.equals(ContractState::class.java)) {
@Suppress("UNCHECKED_CAST")
myInterfaces.add(it as Class<T>)
myInterfaces.addAll(deriveContractInterfaces(it))
}

View File

@ -1,62 +1,45 @@
package net.corda.node.services.vault;
import com.google.common.collect.ImmutableSet;
import kotlin.Pair;
import net.corda.contracts.DealState;
import net.corda.contracts.asset.Cash;
import com.google.common.collect.*;
import kotlin.*;
import net.corda.contracts.*;
import net.corda.contracts.asset.*;
import net.corda.core.contracts.*;
import net.corda.testing.contracts.DummyLinearContract;
import net.corda.core.crypto.*;
import net.corda.core.identity.AbstractParty;
import net.corda.core.messaging.DataFeed;
import net.corda.core.node.services.Vault;
import net.corda.core.node.services.VaultQueryException;
import net.corda.core.node.services.VaultQueryService;
import net.corda.core.node.services.VaultService;
import net.corda.core.identity.*;
import net.corda.core.messaging.*;
import net.corda.core.node.services.*;
import net.corda.core.node.services.vault.*;
import net.corda.core.node.services.vault.QueryCriteria.LinearStateQueryCriteria;
import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria;
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria;
import net.corda.core.schemas.MappedSchema;
import net.corda.core.schemas.testing.DummyLinearStateSchemaV1;
import net.corda.core.utilities.OpaqueBytes;
import net.corda.core.transactions.SignedTransaction;
import net.corda.core.transactions.WireTransaction;
import net.corda.node.services.database.HibernateConfiguration;
import net.corda.node.services.schema.NodeSchemaService;
import net.corda.schemas.CashSchemaV1;
import net.corda.testing.TestConstants;
import net.corda.testing.contracts.VaultFiller;
import net.corda.testing.node.MockServices;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.exposed.sql.Database;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import net.corda.core.node.services.vault.QueryCriteria.*;
import net.corda.core.schemas.*;
import net.corda.core.schemas.testing.*;
import net.corda.core.transactions.*;
import net.corda.core.utilities.*;
import net.corda.node.services.database.*;
import net.corda.node.services.schema.*;
import net.corda.schemas.*;
import net.corda.testing.*;
import net.corda.testing.contracts.*;
import net.corda.testing.node.*;
import org.jetbrains.annotations.*;
import org.jetbrains.exposed.sql.*;
import org.junit.*;
import rx.Observable;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.io.*;
import java.lang.reflect.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.stream.*;
import static net.corda.contracts.asset.CashKt.getDUMMY_CASH_ISSUER;
import static net.corda.contracts.asset.CashKt.getDUMMY_CASH_ISSUER_KEY;
import static net.corda.testing.CoreTestUtils.getBOC;
import static net.corda.testing.CoreTestUtils.getBOC_KEY;
import static net.corda.testing.CoreTestUtils.getBOC_PUBKEY;
import static net.corda.core.contracts.ContractsDSL.USD;
import static net.corda.core.node.services.vault.QueryCriteriaUtils.MAX_PAGE_SIZE;
import static net.corda.node.utilities.DatabaseSupportKt.configureDatabase;
import static net.corda.contracts.asset.CashKt.*;
import static net.corda.core.contracts.ContractsDSL.*;
import static net.corda.core.node.services.vault.QueryCriteriaUtils.*;
import static net.corda.node.utilities.DatabaseSupportKt.*;
import static net.corda.node.utilities.DatabaseSupportKt.transaction;
import static net.corda.testing.CoreTestUtils.getMEGA_CORP;
import static net.corda.testing.CoreTestUtils.getMEGA_CORP_KEY;
import static net.corda.testing.node.MockServicesKt.makeTestDataSourceProperties;
import static net.corda.testing.CoreTestUtils.*;
import static net.corda.testing.node.MockServicesKt.*;
import static net.corda.core.utilities.ByteArrays.toHexString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;
public class VaultQueryJavaTests {
@ -146,7 +129,7 @@ public class VaultQueryJavaTests {
List<StateRef> stateRefs = stateRefsStream.collect(Collectors.toList());
SortAttribute.Standard sortAttribute = new SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID);
Sort sorting = new Sort(Arrays.asList(new Sort.SortColumn(sortAttribute, Sort.Direction.ASC)));
Sort sorting = new Sort(Collections.singletonList(new Sort.SortColumn(sortAttribute, Sort.Direction.ASC)));
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, null, stateRefs);
Vault.Page<DummyLinearContract.State> results = vaultQuerySvc.queryBy(DummyLinearContract.State.class, criteria, sorting);
@ -219,7 +202,7 @@ public class VaultQueryJavaTests {
QueryCriteria compositeCriteria1 = dealCriteriaAll.or(linearCriteriaAll);
QueryCriteria compositeCriteria2 = vaultCriteria.and(compositeCriteria1);
PageSpecification pageSpec = new PageSpecification(0, MAX_PAGE_SIZE);
PageSpecification pageSpec = new PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE);
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
Vault.Page<LinearState> results = vaultQuerySvc.queryBy(LinearState.class, compositeCriteria2, pageSpec, sorting);
@ -232,6 +215,7 @@ public class VaultQueryJavaTests {
}
@Test
@SuppressWarnings("unchecked")
public void customQueryForCashStatesWithAmountOfCurrencyGreaterOrEqualThanQuantity() {
transaction(database, tx -> {
@ -328,7 +312,7 @@ public class VaultQueryJavaTests {
QueryCriteria dealOrLinearIdCriteria = dealCriteria.or(linearCriteria);
QueryCriteria compositeCriteria = dealOrLinearIdCriteria.and(vaultCriteria);
PageSpecification pageSpec = new PageSpecification(0, MAX_PAGE_SIZE);
PageSpecification pageSpec = new PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE);
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
DataFeed<Vault.Page<ContractState>, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
@ -408,6 +392,7 @@ public class VaultQueryJavaTests {
*/
@Test
@SuppressWarnings("unchecked")
public void aggregateFunctionsWithoutGroupClause() {
transaction(database, tx -> {
@ -452,6 +437,7 @@ public class VaultQueryJavaTests {
}
@Test
@SuppressWarnings("unchecked")
public void aggregateFunctionsWithSingleGroupClause() {
transaction(database, tx -> {
@ -472,11 +458,11 @@ public class VaultQueryJavaTests {
Field pennies = CashSchemaV1.PersistentCashState.class.getDeclaredField("pennies");
Field currency = CashSchemaV1.PersistentCashState.class.getDeclaredField("currency");
QueryCriteria sumCriteria = new VaultCustomQueryCriteria(Builder.sum(pennies, Arrays.asList(currency)));
QueryCriteria sumCriteria = new VaultCustomQueryCriteria(Builder.sum(pennies, Collections.singletonList(currency)));
QueryCriteria countCriteria = new VaultCustomQueryCriteria(Builder.count(pennies));
QueryCriteria maxCriteria = new VaultCustomQueryCriteria(Builder.max(pennies, Arrays.asList(currency)));
QueryCriteria minCriteria = new VaultCustomQueryCriteria(Builder.min(pennies, Arrays.asList(currency)));
QueryCriteria avgCriteria = new VaultCustomQueryCriteria(Builder.avg(pennies, Arrays.asList(currency)));
QueryCriteria maxCriteria = new VaultCustomQueryCriteria(Builder.max(pennies, Collections.singletonList(currency)));
QueryCriteria minCriteria = new VaultCustomQueryCriteria(Builder.min(pennies, Collections.singletonList(currency)));
QueryCriteria avgCriteria = new VaultCustomQueryCriteria(Builder.avg(pennies, Collections.singletonList(currency)));
QueryCriteria criteria = sumCriteria.and(countCriteria).and(maxCriteria).and(minCriteria).and(avgCriteria);
Vault.Page<Cash.State> results = vaultQuerySvc.queryBy(Cash.State.class, criteria);
@ -522,6 +508,7 @@ public class VaultQueryJavaTests {
}
@Test
@SuppressWarnings("unchecked")
public void aggregateFunctionsSumByIssuerAndCurrencyAndSortByAggregateSum() {
transaction(database, tx -> {

View File

@ -37,10 +37,8 @@ import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import org.junit.*
import org.junit.rules.ExpectedException
import java.io.Closeable
import java.lang.Thread.sleep
import java.math.BigInteger
@ -825,7 +823,7 @@ class VaultQueryTests {
// Last page implies we need to perform a row count for the Query first,
// and then re-query for a given offset defined by (count - pageSize)
val pagingSpec = PageSpecification(9, 10)
val pagingSpec = PageSpecification(10, 10)
val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
val results = vaultQuerySvc.queryBy<ContractState>(criteria, paging = pagingSpec)
@ -834,48 +832,54 @@ class VaultQueryTests {
}
}
@get:Rule
val expectedEx = ExpectedException.none()!!
// pagination: invalid page number
@Test(expected = VaultQueryException::class)
@Test
fun `invalid page number`() {
expectedEx.expect(VaultQueryException::class.java)
expectedEx.expectMessage("Page specification: invalid page number")
database.transaction {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 100, 100, Random(0L))
val pagingSpec = PageSpecification(-1, 10)
val pagingSpec = PageSpecification(0, 10)
val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
val results = vaultQuerySvc.queryBy<ContractState>(criteria, paging = pagingSpec)
assertThat(results.states).hasSize(10) // should retrieve states 90..99
vaultQuerySvc.queryBy<ContractState>(criteria, paging = pagingSpec)
}
}
// pagination: invalid page size
@Test(expected = VaultQueryException::class)
@Test
fun `invalid page size`() {
expectedEx.expect(VaultQueryException::class.java)
expectedEx.expectMessage("Page specification: invalid page size")
database.transaction {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 100, 100, Random(0L))
val pagingSpec = PageSpecification(0, MAX_PAGE_SIZE + 1)
val pagingSpec = PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE + 1) // overflow = -2147483648
val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
vaultQuerySvc.queryBy<ContractState>(criteria, paging = pagingSpec)
assertFails { }
}
}
// pagination: out or range request (page number * page size) > total rows available
@Test(expected = VaultQueryException::class)
fun `out of range page request`() {
// pagination not specified but more than DEFAULT_PAGE_SIZE results available (fail-fast test)
@Test
fun `pagination not specified but more than default results available`() {
expectedEx.expect(VaultQueryException::class.java)
expectedEx.expectMessage("Please specify a `PageSpecification`")
database.transaction {
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 100, 100, Random(0L))
val pagingSpec = PageSpecification(10, 10) // this requests results 101 .. 110
services.fillWithSomeTestCash(201.DOLLARS, DUMMY_NOTARY, 201, 201, Random(0L))
val criteria = VaultQueryCriteria(status = Vault.StateStatus.ALL)
val results = vaultQuerySvc.queryBy<ContractState>(criteria, paging = pagingSpec)
assertFails { println("Query should throw an exception [${results.states.count()}]") }
vaultQuerySvc.queryBy<ContractState>(criteria)
}
}
@ -1776,7 +1780,7 @@ class VaultQueryTests {
updates
}
updates?.expectEvents {
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
@ -1823,7 +1827,7 @@ class VaultQueryTests {
updates
}
updates?.expectEvents {
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
@ -1870,7 +1874,7 @@ class VaultQueryTests {
updates
}
updates?.expectEvents {
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
@ -1926,7 +1930,7 @@ class VaultQueryTests {
updates
}
updates?.expectEvents {
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
@ -1976,7 +1980,7 @@ class VaultQueryTests {
updates
}
updates?.expectEvents {
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}