mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Replace Vault.PageAndUpdates with DataFeed data class (#931)
* Replace kotlin Pair with DataFeed data class * remove unintended changes * Replace Vault.PageAndUpdates with DataFeed data class * Remove PageAndUpdates
This commit is contained in:
@ -68,9 +68,9 @@ class CordaRPCOpsImpl(
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria,
|
||||
paging: PageSpecification,
|
||||
sorting: Sort,
|
||||
contractType: Class<out T>): Vault.PageAndUpdates<T> {
|
||||
contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update> {
|
||||
return database.transaction {
|
||||
services.vaultQueryService._trackBy<T>(criteria, paging, sorting, contractType)
|
||||
services.vaultQueryService._trackBy(criteria, paging, sorting, contractType)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.VaultQueryService
|
||||
@ -20,7 +21,6 @@ import net.corda.core.serialization.storageKryo
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Exception
|
||||
@ -63,7 +63,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
|
||||
// pagination
|
||||
if (paging.pageNumber < 0) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from 0]")
|
||||
if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]")
|
||||
if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]")
|
||||
|
||||
// count total results available
|
||||
val countQuery = criteriaBuilder.createQuery(Long::class.java)
|
||||
@ -99,15 +99,14 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox ({ updatesPublisher })
|
||||
private val mutex = ThreadBox({ updatesPublisher })
|
||||
|
||||
@Throws(VaultQueryException::class)
|
||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): Vault.PageAndUpdates<T> {
|
||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update> {
|
||||
return mutex.locked {
|
||||
val snapshotResults = _queryBy<T>(criteria, paging, sorting, contractType)
|
||||
Vault.PageAndUpdates(snapshotResults,
|
||||
updatesPublisher.bufferUntilSubscribed()
|
||||
.filter { it.containsType(contractType, snapshotResults.stateTypes) } )
|
||||
val updates = updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) }
|
||||
DataFeed(snapshotResults, updates)
|
||||
}
|
||||
}
|
||||
|
||||
@ -115,7 +114,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
* Maintain a list of contract state interfaces to concrete types stored in the vault
|
||||
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
|
||||
*/
|
||||
fun resolveUniqueContractStateTypes(session: EntityManager) : Map<String, List<String>> {
|
||||
fun resolveUniqueContractStateTypes(session: EntityManager): Map<String, List<String>> {
|
||||
val criteria = criteriaBuilder.createQuery(String::class.java)
|
||||
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
|
||||
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
|
||||
@ -135,7 +134,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
return contractInterfaceToConcreteTypes
|
||||
}
|
||||
|
||||
private fun <T: ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
|
||||
private fun <T : ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {
|
||||
val myInterfaces: MutableSet<Class<T>> = mutableSetOf()
|
||||
clazz.interfaces.forEach {
|
||||
if (!it.equals(ContractState::class.java)) {
|
||||
|
@ -7,6 +7,7 @@ import net.corda.contracts.asset.*;
|
||||
import net.corda.core.contracts.*;
|
||||
import net.corda.core.crypto.*;
|
||||
import net.corda.core.identity.*;
|
||||
import net.corda.core.messaging.DataFeed;
|
||||
import net.corda.core.node.services.*;
|
||||
import net.corda.core.node.services.vault.*;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.*;
|
||||
@ -248,7 +249,7 @@ public class VaultQueryJavaTests {
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, contractStateTypes);
|
||||
Vault.PageAndUpdates<ContractState> results = vaultQuerySvc.trackBy(ContractState.class, criteria);
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, criteria);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getCurrent();
|
||||
Observable<Vault.Update> updates = results.getFuture();
|
||||
@ -289,7 +290,7 @@ public class VaultQueryJavaTests {
|
||||
PageSpecification pageSpec = new PageSpecification(0, getMAX_PAGE_SIZE());
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
|
||||
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
|
||||
Vault.PageAndUpdates<ContractState> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getCurrent();
|
||||
Observable<Vault.Update> updates = results.getFuture();
|
||||
|
Reference in New Issue
Block a user