diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 807519cb96..cc79f7e26a 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -125,23 +125,23 @@ interface CordaRPCOps : RPCOps { fun vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, - contractType: Class): Vault.PageAndUpdates + contractType: Class): DataFeed, Vault.Update> // DOCEND VaultTrackByAPI // Note: cannot apply @JvmOverloads to interfaces nor interface implementations // Java Helpers // DOCSTART VaultTrackAPIHelpers - fun vaultTrack(contractType: Class): Vault.PageAndUpdates { + fun vaultTrack(contractType: Class): DataFeed, Vault.Update> { return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractType) } - fun vaultTrackByCriteria(contractType: Class, criteria: QueryCriteria): Vault.PageAndUpdates { + fun vaultTrackByCriteria(contractType: Class, criteria: QueryCriteria): DataFeed, Vault.Update> { return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractType) } - fun vaultTrackByWithPagingSpec(contractType: Class, criteria: QueryCriteria, paging: PageSpecification): Vault.PageAndUpdates { + fun vaultTrackByWithPagingSpec(contractType: Class, criteria: QueryCriteria, paging: PageSpecification): DataFeed, Vault.Update> { return vaultTrackBy(criteria, paging, Sort(emptySet()), contractType) } - fun vaultTrackByWithSorting(contractType: Class, criteria: QueryCriteria, sorting: Sort): Vault.PageAndUpdates { + fun vaultTrackByWithSorting(contractType: Class, criteria: QueryCriteria, sorting: Sort): DataFeed, Vault.Update> { return vaultTrackBy(criteria, PageSpecification(), sorting, contractType) } // DOCEND VaultTrackAPIHelpers @@ -302,7 +302,7 @@ inline fun CordaRPCOps.vaultQueryBy(criteria: QueryC inline fun CordaRPCOps.vaultTrackBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), paging: PageSpecification = PageSpecification(), - sorting: Sort = Sort(emptySet())): Vault.PageAndUpdates { + sorting: Sort = Sort(emptySet())): DataFeed, Vault.Update> { return vaultTrackBy(criteria, paging, sorting, T::class.java) } @@ -402,4 +402,8 @@ data class DataFeed(val snapshot: A, val updates: Observable) { val first: A get() = snapshot @Deprecated("This function will be removed in a future milestone", ReplaceWith("updates")) val second: Observable get() = updates + @Deprecated("This function will be removed in a future milestone", ReplaceWith("snapshot")) + val current: A get() = snapshot + @Deprecated("This function will be removed in a future milestone", ReplaceWith("updates")) + val future: Observable get() = updates } diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index db14d69bfd..432140cf8f 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -141,9 +141,6 @@ class Vault(val states: Iterable>) { val notaryKey: String, val lockId: String?, val lockUpdateTime: Instant?) - - @CordaSerializable - data class PageAndUpdates(val current: Vault.Page, val future: Observable) } /** @@ -382,7 +379,7 @@ interface VaultQueryService { fun _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, - contractType: Class): Vault.PageAndUpdates + contractType: Class): DataFeed, Vault.Update> // DOCEND VaultQueryAPI // Note: cannot apply @JvmOverloads to interfaces nor interface implementations @@ -406,16 +403,16 @@ interface VaultQueryService { fun trackBy(contractType: Class): Vault.Page { return _queryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractType) } - fun trackBy(contractType: Class, criteria: QueryCriteria): Vault.PageAndUpdates { + fun trackBy(contractType: Class, criteria: QueryCriteria): DataFeed, Vault.Update> { return _trackBy(criteria, PageSpecification(), Sort(emptySet()), contractType) } - fun trackBy(contractType: Class, criteria: QueryCriteria, paging: PageSpecification): Vault.PageAndUpdates { + fun trackBy(contractType: Class, criteria: QueryCriteria, paging: PageSpecification): DataFeed, Vault.Update> { return _trackBy(criteria, paging, Sort(emptySet()), contractType) } - fun trackBy(contractType: Class, criteria: QueryCriteria, sorting: Sort): Vault.PageAndUpdates { + fun trackBy(contractType: Class, criteria: QueryCriteria, sorting: Sort): DataFeed, Vault.Update> { return _trackBy(criteria, PageSpecification(), sorting, contractType) } - fun trackBy(contractType: Class, criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): Vault.PageAndUpdates { + fun trackBy(contractType: Class, criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): DataFeed, Vault.Update> { return _trackBy(criteria, paging, sorting, contractType) } } @@ -440,23 +437,23 @@ inline fun VaultQueryService.queryBy(criteria: Query return _queryBy(criteria, paging, sorting, T::class.java) } -inline fun VaultQueryService.trackBy(): Vault.PageAndUpdates { +inline fun VaultQueryService.trackBy(): DataFeed, Vault.Update> { return _trackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), T::class.java) } -inline fun VaultQueryService.trackBy(criteria: QueryCriteria): Vault.PageAndUpdates { +inline fun VaultQueryService.trackBy(criteria: QueryCriteria): DataFeed, Vault.Update> { return _trackBy(criteria, PageSpecification(), Sort(emptySet()), T::class.java) } -inline fun VaultQueryService.trackBy(criteria: QueryCriteria, paging: PageSpecification): Vault.PageAndUpdates { +inline fun VaultQueryService.trackBy(criteria: QueryCriteria, paging: PageSpecification): DataFeed, Vault.Update> { return _trackBy(criteria, paging, Sort(emptySet()), T::class.java) } -inline fun VaultQueryService.trackBy(criteria: QueryCriteria, sorting: Sort): Vault.PageAndUpdates { +inline fun VaultQueryService.trackBy(criteria: QueryCriteria, sorting: Sort): DataFeed, Vault.Update> { return _trackBy(criteria, PageSpecification(), sorting, T::class.java) } -inline fun VaultQueryService.trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): Vault.PageAndUpdates { +inline fun VaultQueryService.trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort): DataFeed, Vault.Update> { return _trackBy(criteria, paging, sorting, T::class.java) } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 881ae4644d..f8d4f77378 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -68,9 +68,9 @@ class CordaRPCOpsImpl( override fun vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, - contractType: Class): Vault.PageAndUpdates { + contractType: Class): DataFeed, Vault.Update> { return database.transaction { - services.vaultQueryService._trackBy(criteria, paging, sorting, contractType) + services.vaultQueryService._trackBy(criteria, paging, sorting, contractType) } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt b/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt index a014a3b545..4d43bd702d 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt @@ -7,6 +7,7 @@ import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash +import net.corda.core.messaging.DataFeed import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultQueryException import net.corda.core.node.services.VaultQueryService @@ -20,7 +21,6 @@ import net.corda.core.serialization.storageKryo import net.corda.core.utilities.loggerFor import net.corda.node.services.database.HibernateConfiguration import net.corda.node.services.vault.schemas.jpa.VaultSchemaV1 -import net.corda.node.utilities.wrapWithDatabaseTransaction import org.jetbrains.exposed.sql.transactions.TransactionManager import rx.subjects.PublishSubject import java.lang.Exception @@ -63,7 +63,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration, // pagination if (paging.pageNumber < 0) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from 0]") - if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]") + if (paging.pageSize < 0 || paging.pageSize > MAX_PAGE_SIZE) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [maximum page size is ${MAX_PAGE_SIZE}]") // count total results available val countQuery = criteriaBuilder.createQuery(Long::class.java) @@ -99,15 +99,14 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration, } } - private val mutex = ThreadBox ({ updatesPublisher }) + private val mutex = ThreadBox({ updatesPublisher }) @Throws(VaultQueryException::class) - override fun _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class): Vault.PageAndUpdates { + override fun _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class): DataFeed, Vault.Update> { return mutex.locked { val snapshotResults = _queryBy(criteria, paging, sorting, contractType) - Vault.PageAndUpdates(snapshotResults, - updatesPublisher.bufferUntilSubscribed() - .filter { it.containsType(contractType, snapshotResults.stateTypes) } ) + val updates = updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) } + DataFeed(snapshotResults, updates) } } @@ -115,7 +114,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration, * Maintain a list of contract state interfaces to concrete types stored in the vault * for usage in generic queries of type queryBy or queryBy> */ - fun resolveUniqueContractStateTypes(session: EntityManager) : Map> { + fun resolveUniqueContractStateTypes(session: EntityManager): Map> { val criteria = criteriaBuilder.createQuery(String::class.java) val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java) criteria.select(vaultStates.get("contractStateClassName")).distinct(true) @@ -135,7 +134,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration, return contractInterfaceToConcreteTypes } - private fun deriveContractInterfaces(clazz: Class): Set> { + private fun deriveContractInterfaces(clazz: Class): Set> { val myInterfaces: MutableSet> = mutableSetOf() clazz.interfaces.forEach { if (!it.equals(ContractState::class.java)) { diff --git a/node/src/test/java/net/corda/node/services/vault/VaultQueryJavaTests.java b/node/src/test/java/net/corda/node/services/vault/VaultQueryJavaTests.java index 542c98a5db..d3daf97f05 100644 --- a/node/src/test/java/net/corda/node/services/vault/VaultQueryJavaTests.java +++ b/node/src/test/java/net/corda/node/services/vault/VaultQueryJavaTests.java @@ -7,6 +7,7 @@ import net.corda.contracts.asset.*; import net.corda.core.contracts.*; import net.corda.core.crypto.*; import net.corda.core.identity.*; +import net.corda.core.messaging.DataFeed; import net.corda.core.node.services.*; import net.corda.core.node.services.vault.*; import net.corda.core.node.services.vault.QueryCriteria.*; @@ -248,7 +249,7 @@ public class VaultQueryJavaTests { Set> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class)); VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, contractStateTypes); - Vault.PageAndUpdates results = vaultQuerySvc.trackBy(ContractState.class, criteria); + DataFeed, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, criteria); Vault.Page snapshot = results.getCurrent(); Observable updates = results.getFuture(); @@ -289,7 +290,7 @@ public class VaultQueryJavaTests { PageSpecification pageSpec = new PageSpecification(0, getMAX_PAGE_SIZE()); Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC); Sort sorting = new Sort(ImmutableSet.of(sortByUid)); - Vault.PageAndUpdates results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting); + DataFeed, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting); Vault.Page snapshot = results.getCurrent(); Observable updates = results.getFuture();