mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Add generics to Vault.Update
This commit is contained in:
@ -45,7 +45,7 @@ class CordaRPCOpsImpl(
|
||||
}
|
||||
}
|
||||
|
||||
override fun vaultAndUpdates(): DataFeed<List<StateAndRef<ContractState>>, Vault.Update> {
|
||||
override fun vaultAndUpdates(): DataFeed<List<StateAndRef<ContractState>>, Vault.Update<ContractState>> {
|
||||
return database.transaction {
|
||||
val (vault, updates) = services.vaultService.track()
|
||||
DataFeed(vault.states.toList(), updates)
|
||||
@ -65,7 +65,7 @@ class CordaRPCOpsImpl(
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria,
|
||||
paging: PageSpecification,
|
||||
sorting: Sort,
|
||||
contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update> {
|
||||
contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
return database.transaction {
|
||||
services.vaultQueryService._trackBy(criteria, paging, sorting, contractType)
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import rx.Observable
|
||||
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
|
||||
*/
|
||||
// TODO: Manage version evolution of the schemas via additional tooling.
|
||||
class HibernateObserver(vaultUpdates: Observable<Vault.Update>, val config: HibernateConfiguration) {
|
||||
class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, val config: HibernateConfiguration) {
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<HibernateObserver>()
|
||||
|
@ -21,6 +21,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.Observable
|
||||
import java.lang.Exception
|
||||
import java.util.*
|
||||
import javax.persistence.EntityManager
|
||||
@ -28,7 +29,7 @@ import javax.persistence.Tuple
|
||||
|
||||
|
||||
class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
val updatesPublisher: PublishSubject<Vault.Update>) : SingletonSerializeAsToken(), VaultQueryService {
|
||||
val updatesPublisher: PublishSubject<Vault.Update<ContractState>>) : SingletonSerializeAsToken(), VaultQueryService {
|
||||
companion object {
|
||||
val log = loggerFor<HibernateVaultQueryImpl>()
|
||||
}
|
||||
@ -107,7 +108,6 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
}
|
||||
|
||||
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
|
||||
|
||||
} catch (e: Exception) {
|
||||
log.error(e.message)
|
||||
throw e.cause ?: e
|
||||
@ -118,10 +118,11 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
private val mutex = ThreadBox({ updatesPublisher })
|
||||
|
||||
@Throws(VaultQueryException::class)
|
||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update> {
|
||||
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
return mutex.locked {
|
||||
val snapshotResults = _queryBy<T>(criteria, paging, sorting, contractType)
|
||||
val updates = updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) }
|
||||
val snapshotResults = _queryBy(criteria, paging, sorting, contractType)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val updates = updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractType, snapshotResults.stateTypes) } as Observable<Vault.Update<T>>
|
||||
DataFeed(snapshotResults, updates)
|
||||
}
|
||||
}
|
||||
|
@ -74,16 +74,16 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
val session = configuration.sessionForModel(Models.VAULT)
|
||||
|
||||
private class InnerState {
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()!!
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()!!
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()!!
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()!!
|
||||
val _updatesInDbTx = _updatesPublisher.wrapWithDatabaseTransaction().asObservable()!!
|
||||
|
||||
// For use during publishing only.
|
||||
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
|
||||
val updatesPublisher: rx.Observer<Vault.Update<ContractState>> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
private fun recordUpdate(update: Vault.Update): Vault.Update {
|
||||
private fun recordUpdate(update: Vault.Update<ContractState>): Vault.Update<ContractState> {
|
||||
if (update != Vault.NoUpdate) {
|
||||
val producedStateRefs = update.produced.map { it.ref }
|
||||
val producedStateRefsMap = update.produced.associateBy { it.ref }
|
||||
@ -126,16 +126,16 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
return update
|
||||
}
|
||||
|
||||
override val rawUpdates: Observable<Vault.Update>
|
||||
override val rawUpdates: Observable<Vault.Update<ContractState>>
|
||||
get() = mutex.locked { _rawUpdatesPublisher }
|
||||
|
||||
override val updates: Observable<Vault.Update>
|
||||
override val updates: Observable<Vault.Update<ContractState>>
|
||||
get() = mutex.locked { _updatesInDbTx }
|
||||
|
||||
override val updatesPublisher: PublishSubject<Vault.Update>
|
||||
override val updatesPublisher: PublishSubject<Vault.Update<ContractState>>
|
||||
get() = mutex.locked { _updatesPublisher }
|
||||
|
||||
override fun track(): DataFeed<Vault<ContractState>, Vault.Update> {
|
||||
override fun track(): DataFeed<Vault<ContractState>, Vault.Update<ContractState>> {
|
||||
return mutex.locked {
|
||||
DataFeed(Vault(unconsumedStates<ContractState>()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
}
|
||||
@ -421,7 +421,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
= txState.copy(data = txState.data.copy(amount = amount, owner = owner))
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun makeUpdate(tx: WireTransaction, ourKeys: Set<PublicKey>): Vault.Update {
|
||||
internal fun makeUpdate(tx: WireTransaction, ourKeys: Set<PublicKey>): Vault.Update<ContractState> {
|
||||
val ourNewStates = tx.filterOutRefs<ContractState> { isRelevant(it, ourKeys) }
|
||||
|
||||
// Retrieve all unconsumed states for this transaction's inputs
|
||||
|
@ -288,10 +288,10 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
Set<Class<ContractState>> contractStateTypes = new HashSet(Collections.singletonList(Cash.State.class));
|
||||
|
||||
VaultQueryCriteria criteria = new VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, contractStateTypes);
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, criteria);
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update<ContractState>> results = vaultQuerySvc.trackBy(ContractState.class, criteria);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getSnapshot();
|
||||
Observable<Vault.Update> updates = results.getUpdates();
|
||||
Observable<Vault.Update<ContractState>> updates = results.getUpdates();
|
||||
|
||||
// DOCEND VaultJavaQueryExample4
|
||||
assertThat(snapshot.getStates()).hasSize(3);
|
||||
@ -325,10 +325,10 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
|
||||
PageSpecification pageSpec = new PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE);
|
||||
Sort.SortColumn sortByUid = new Sort.SortColumn(new SortAttribute.Standard(Sort.LinearStateAttribute.UUID), Sort.Direction.DESC);
|
||||
Sort sorting = new Sort(ImmutableSet.of(sortByUid));
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
|
||||
DataFeed<Vault.Page<ContractState>, Vault.Update<ContractState>> results = vaultQuerySvc.trackBy(ContractState.class, compositeCriteria, pageSpec, sorting);
|
||||
|
||||
Vault.Page<ContractState> snapshot = results.getSnapshot();
|
||||
Observable<Vault.Update> updates = results.getUpdates();
|
||||
Observable<Vault.Update<ContractState>> updates = results.getUpdates();
|
||||
// DOCEND VaultJavaQueryExample5
|
||||
|
||||
assertThat(snapshot.getStates()).hasSize(13);
|
||||
|
@ -53,8 +53,8 @@ class CordaRPCOpsImplTest {
|
||||
lateinit var rpc: CordaRPCOps
|
||||
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
|
||||
lateinit var transactions: Observable<SignedTransaction>
|
||||
lateinit var vaultUpdates: Observable<Vault.Update> // TODO: deprecated
|
||||
lateinit var vaultTrackCash: Observable<Vault.Update>
|
||||
lateinit var vaultUpdates: Observable<Vault.Update<ContractState>> // TODO: deprecated
|
||||
lateinit var vaultTrackCash: Observable<Vault.Update<Cash.State>>
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
|
@ -86,7 +86,7 @@ class HibernateObserverTests {
|
||||
@Test
|
||||
fun testChildObjectsArePersisted() {
|
||||
val testSchema = object : MappedSchema(SchemaFamily::class.java, 1, setOf(Parent::class.java, Child::class.java)) {}
|
||||
val rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
val rawUpdatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()
|
||||
val schemaService = object : SchemaService {
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = emptyMap()
|
||||
|
||||
|
@ -449,7 +449,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
|
||||
@Test
|
||||
fun `make update`() {
|
||||
val service = (services.vaultService as NodeVaultService)
|
||||
val vaultSubscriber = TestSubscriber<Vault.Update>().apply {
|
||||
val vaultSubscriber = TestSubscriber<Vault.Update<*>>().apply {
|
||||
service.updates.subscribe(this)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user