mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
ENT-1389 Modify the HibernateObserver to persist states by schema (and only create a session per schema, not one per state per schema) (#2366)
This commit is contained in:
parent
df195b20bd
commit
5e7d2f00ae
@ -10,11 +10,18 @@ import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import org.hibernate.FlushMode
|
||||
import rx.Observable
|
||||
|
||||
|
||||
/**
|
||||
* Small data class bundling together a ContractState and a StateRef (as opposed to a TransactionState and StateRef
|
||||
* in StateAndRef)
|
||||
*/
|
||||
data class ContractStateAndRef(val state: ContractState, val ref: StateRef)
|
||||
|
||||
/**
|
||||
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
|
||||
*/
|
||||
@ -30,27 +37,33 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
|
||||
}
|
||||
|
||||
private fun persist(produced: Set<StateAndRef<ContractState>>) {
|
||||
produced.forEach { persistState(it) }
|
||||
}
|
||||
|
||||
private fun persistState(stateAndRef: StateAndRef<ContractState>) {
|
||||
val state = stateAndRef.state.data
|
||||
log.debug { "Asked to persist state ${stateAndRef.ref}" }
|
||||
schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
|
||||
val stateBySchema: MutableMap<MappedSchema, MutableList<ContractStateAndRef>> = mutableMapOf()
|
||||
// map all states by their referenced schemas
|
||||
produced.forEach {
|
||||
val contractStateAndRef = ContractStateAndRef(it.state.data, it.ref)
|
||||
log.debug { "Asked to persist state ${it.ref}" }
|
||||
schemaService.selectSchemas(contractStateAndRef.state).forEach {
|
||||
stateBySchema.getOrPut(it) { mutableListOf() }.add(contractStateAndRef)
|
||||
}
|
||||
}
|
||||
// then persist all states for each schema
|
||||
stateBySchema.forEach { persistStatesWithSchema(it.value, it.key) }
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) {
|
||||
internal fun persistStatesWithSchema(statesAndRefs: List<ContractStateAndRef>, schema: MappedSchema) {
|
||||
val sessionFactory = config.sessionFactoryForSchemas(setOf(schema))
|
||||
val session = sessionFactory.withOptions().
|
||||
connection(DatabaseTransactionManager.current().connection).
|
||||
flushMode(FlushMode.MANUAL).
|
||||
openSession()
|
||||
session.use {
|
||||
val mappedObject = schemaService.generateMappedObject(state, schema)
|
||||
mappedObject.stateRef = PersistentStateRef(stateRef)
|
||||
it.persist(mappedObject)
|
||||
it.flush()
|
||||
session.use { thisSession ->
|
||||
statesAndRefs.forEach {
|
||||
val mappedObject = schemaService.generateMappedObject(it.state, schema)
|
||||
mappedObject.stateRef = PersistentStateRef(it.ref)
|
||||
thisSession.persist(mappedObject)
|
||||
}
|
||||
thisSession.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,10 +28,11 @@ import net.corda.finance.schemas.CashSchemaV1
|
||||
import net.corda.finance.schemas.SampleCashSchemaV2
|
||||
import net.corda.finance.schemas.SampleCashSchemaV3
|
||||
import net.corda.finance.utils.sumCash
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.ContractStateAndRef
|
||||
import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -503,11 +504,12 @@ class HibernateConfigurationTest {
|
||||
fun `count CashStates in V2`() {
|
||||
database.transaction {
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
val stateAndRefs = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV2)
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV2)
|
||||
}
|
||||
|
||||
// structure query
|
||||
@ -525,11 +527,12 @@ class HibernateConfigurationTest {
|
||||
database.transaction {
|
||||
vaultFiller.fillWithSomeTestLinearStates(5)
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
val stateAndRefs = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV2)
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV2)
|
||||
}
|
||||
|
||||
// structure query
|
||||
@ -620,11 +623,12 @@ class HibernateConfigurationTest {
|
||||
fun `select fungible states by owner party`() {
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val stateAndRefs = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV3)
|
||||
}
|
||||
|
||||
// structure query
|
||||
@ -643,19 +647,20 @@ class HibernateConfigurationTest {
|
||||
fun `query fungible states by owner party`() {
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val stateAndRefs: MutableList<ContractStateAndRef> = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}.toMutableList()
|
||||
vaultFiller.fillWithSomeTestCash(100.DOLLARS, issuerServices, 2, issuer.ref(1), ALICE, Random(0L))
|
||||
val cashStates = vaultFiller.fillWithSomeTestCash(100.DOLLARS, services, 2, identity.ref(0)).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
stateAndRefs.addAll(cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
})
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV3)
|
||||
}
|
||||
val sessionFactory = sessionFactoryForSchemas(VaultSchemaV1, CommonSchemaV1, SampleCashSchemaV3)
|
||||
val criteriaBuilder = sessionFactory.criteriaBuilder
|
||||
@ -694,12 +699,13 @@ class HibernateConfigurationTest {
|
||||
@Test
|
||||
fun `select fungible states by participants`() {
|
||||
database.transaction {
|
||||
// persist cash states explicitly with V2 schema
|
||||
cashStates.forEach {
|
||||
// persist cash states explicitly with V3 schema
|
||||
val stateAndRefs = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV3)
|
||||
}
|
||||
|
||||
// structure query
|
||||
@ -720,25 +726,27 @@ class HibernateConfigurationTest {
|
||||
val firstCashState =
|
||||
database.transaction {
|
||||
// persist original cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
val stateAndRefs: MutableList<ContractStateAndRef> = cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
}.toMutableList()
|
||||
|
||||
val moreCash = vaultFiller.fillWithSomeTestCash(100.DOLLARS, services, 2, identity.ref(0), identity, Random(0L)).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
moreCash.forEach {
|
||||
stateAndRefs.addAll(moreCash.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
})
|
||||
val cashStates = vaultFiller.fillWithSomeTestCash(100.DOLLARS, issuerServices, 2, issuer.ref(1), ALICE, Random(0L)).states
|
||||
// persist additional cash states explicitly with V3 schema
|
||||
cashStates.forEach {
|
||||
stateAndRefs.addAll(cashStates.map {
|
||||
val cashState = it.state.data
|
||||
val dummyFungibleState = DummyFungibleContract.State(cashState.amount, cashState.owner)
|
||||
hibernatePersister.persistStateWithSchema(dummyFungibleState, it.ref, SampleCashSchemaV3)
|
||||
}
|
||||
ContractStateAndRef(dummyFungibleState, it.ref)
|
||||
})
|
||||
hibernatePersister.persistStatesWithSchema(stateAndRefs, SampleCashSchemaV3)
|
||||
cashStates.first()
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user