mirror of
https://github.com/corda/corda.git
synced 2025-02-21 17:56:54 +00:00
fetch unconsumed vault snapshot instead of ALL (#1587)
This commit is contained in:
parent
b1d1d74d6e
commit
76fef4ed49
@ -9,10 +9,13 @@ import net.corda.core.identity.Party
|
|||||||
import net.corda.core.messaging.*
|
import net.corda.core.messaging.*
|
||||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||||
import net.corda.core.node.services.Vault
|
import net.corda.core.node.services.Vault
|
||||||
import net.corda.core.node.services.vault.*
|
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
|
||||||
|
import net.corda.core.node.services.vault.PageSpecification
|
||||||
|
import net.corda.core.node.services.vault.QueryCriteria
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
|
|
||||||
@ -86,8 +89,13 @@ class NodeMonitorModel {
|
|||||||
stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject)
|
stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject)
|
||||||
|
|
||||||
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
||||||
val (vaultSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
val (_, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
||||||
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
|
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
|
||||||
|
|
||||||
|
val vaultSnapshot = proxy.vaultQueryBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.UNCONSUMED),
|
||||||
|
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
|
||||||
|
// We have to fetch the snapshot separately since vault query API doesn't allow different criteria for snapshot and updates.
|
||||||
|
// TODO : This will create a small window of opportunity for inconsistent updates, might need to change the vault API to handle this case.
|
||||||
val initialVaultUpdate = Vault.Update(setOf(), vaultSnapshot.states.toSet())
|
val initialVaultUpdate = Vault.Update(setOf(), vaultSnapshot.states.toSet())
|
||||||
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)
|
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user