mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
[CORDA-3303] - Avoid flushing when inside a cascade (#5575)
* [CORDA-3303] - Avoid flushing when inside a cascade * Remove listener infrastructure
This commit is contained in:
parent
48fd78d059
commit
7666ca0d80
File diff suppressed because one or more lines are too long
@ -25,9 +25,6 @@ class DatabaseTransaction(
|
||||
) {
|
||||
val id: UUID = UUID.randomUUID()
|
||||
|
||||
val flushing: Boolean get() = _flushingCount > 0
|
||||
private var _flushingCount = 0
|
||||
|
||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
database.dataSource.connection.apply {
|
||||
autoCommit = false
|
||||
@ -37,27 +34,6 @@ class DatabaseTransaction(
|
||||
|
||||
private val sessionDelegate = lazy {
|
||||
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||
session.addEventListeners(object : BaseSessionEventListener() {
|
||||
override fun flushStart() {
|
||||
_flushingCount++
|
||||
super.flushStart()
|
||||
}
|
||||
|
||||
override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) {
|
||||
super.flushEnd(numberOfEntities, numberOfCollections)
|
||||
_flushingCount--
|
||||
}
|
||||
|
||||
override fun partialFlushStart() {
|
||||
_flushingCount++
|
||||
super.partialFlushStart()
|
||||
}
|
||||
|
||||
override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) {
|
||||
super.partialFlushEnd(numberOfEntities, numberOfCollections)
|
||||
_flushingCount--
|
||||
}
|
||||
})
|
||||
hibernateTransaction = session.beginTransaction()
|
||||
session
|
||||
}
|
||||
|
@ -7,6 +7,8 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.hibernate.Session
|
||||
import org.hibernate.internal.SessionImpl
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
@ -191,14 +193,23 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
|
||||
private fun loadValue(key: K): V? {
|
||||
val session = currentDBSession()
|
||||
val flushing = contextTransaction.flushing
|
||||
if (!flushing) {
|
||||
val isSafeToDetach = isSafeToFlushAndDetach(session)
|
||||
if (isSafeToDetach) {
|
||||
// IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed.
|
||||
// We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session.
|
||||
session.flush()
|
||||
}
|
||||
val result = session.find(persistentEntityClass, toPersistentEntityKey(key))
|
||||
return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second
|
||||
return result?.apply { if (isSafeToDetach) session.detach(result) }?.let(fromPersistentEntity)?.second
|
||||
}
|
||||
|
||||
private fun isSafeToFlushAndDetach(session: Session): Boolean {
|
||||
if (session !is SessionImpl)
|
||||
return true
|
||||
|
||||
val flushInProgress = session.persistenceContext.isFlushing
|
||||
val cascadeInProgress = session.persistenceContext.cascadeLevel > 0
|
||||
return !flushInProgress && !cascadeInProgress
|
||||
}
|
||||
|
||||
protected fun transactionalLoadValue(key: K): Transactional<V> {
|
||||
|
@ -1,82 +0,0 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class HibernateColumnConverterTests {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private val cordapps = listOf("net.corda.finance")
|
||||
|
||||
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
|
||||
private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L)
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices(
|
||||
cordappPackages = cordapps,
|
||||
initialIdentity = myself,
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4),
|
||||
moreIdentities = setOf(notary.identity),
|
||||
moreKeys = emptySet()
|
||||
)
|
||||
services = mockServices
|
||||
database = db
|
||||
}
|
||||
|
||||
// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a
|
||||
// cache miss was doing a flush. This also checks that loading during flush does actually work.
|
||||
@Test
|
||||
fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() {
|
||||
val expected = 500.DOLLARS
|
||||
val ref = OpaqueBytes.of(0x01)
|
||||
|
||||
// Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup.
|
||||
val cacheFactory = TestingNamedCacheFactory()
|
||||
val identityService = PersistentIdentityService(cacheFactory)
|
||||
val originalIdentityService: PersistentIdentityService = services.identityService as PersistentIdentityService
|
||||
identityService.database = originalIdentityService.database
|
||||
identityService.start(originalIdentityService.trustRoot, pkToIdCache = PublicKeyToOwningIdentityCacheImpl(database, cacheFactory))
|
||||
val keyService = E2ETestKeyManagementService(identityService)
|
||||
keyService.start(setOf(myself.keyPair))
|
||||
|
||||
// New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc).
|
||||
val newKeyAndCert = keyService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts[0], false)
|
||||
val randomNotary = Party(myself.name, newKeyAndCert.owningKey)
|
||||
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
val issuer = services.myInfo.legalIdentities.first().ref(ref)
|
||||
val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, randomNotary)
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, signers)
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<Cash.State>().single()
|
||||
assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount)
|
||||
}
|
||||
}
|
@ -0,0 +1,168 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.contracts.BelongsToContract
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.hibernate.annotations.Cascade
|
||||
import org.hibernate.annotations.CascadeType
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalArgumentException
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.OneToMany
|
||||
import javax.persistence.Table
|
||||
import javax.persistence.GeneratedValue
|
||||
import javax.persistence.GenerationType
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
/**
|
||||
* These tests cover the interactions between Corda and Hibernate with regards to flushing/detaching/cascading.
|
||||
*/
|
||||
class HibernateInteractionTests {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private val cordapps = listOf("net.corda.finance", "net.corda.node.services.persistence")
|
||||
|
||||
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
|
||||
private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L)
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices(
|
||||
cordappPackages = cordapps,
|
||||
initialIdentity = myself,
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4),
|
||||
moreIdentities = setOf(notary.identity),
|
||||
moreKeys = emptySet(),
|
||||
// forcing a cache size of zero, so that all requests lead to a cache miss and end up hitting the database
|
||||
cacheFactory = TestingNamedCacheFactory(0)
|
||||
)
|
||||
services = mockServices
|
||||
database = db
|
||||
}
|
||||
|
||||
// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a
|
||||
// cache miss was doing a flush. This also checks that loading during flush does actually work.
|
||||
@Test
|
||||
fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() {
|
||||
val expected = 500.DOLLARS
|
||||
val ref = OpaqueBytes.of(0x01)
|
||||
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
val issuer = services.myInfo.legalIdentities.first().ref(ref)
|
||||
val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, notary.party)
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, signers)
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<Cash.State>().single()
|
||||
assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() {
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
|
||||
val childEntities = listOf(SimpleContract.ChildState(ourIdentity))
|
||||
val parentEntity = SimpleContract.ParentState(childEntities)
|
||||
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
.addOutputState(TransactionState(parentEntity, SimpleContract::class.java.name, notary.party))
|
||||
.addCommand(SimpleContract.Issue(), listOf(ourIdentity.owningKey))
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, listOf(ourIdentity.owningKey))
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<SimpleContract.ParentState>().single()
|
||||
assertThat(output.children.single().member).isEqualTo(ourIdentity)
|
||||
}
|
||||
|
||||
object PersistenceSchema: MappedSchema(PersistenceSchema::class.java, 1, listOf(Parent::class.java, Child::class.java)) {
|
||||
|
||||
@Entity(name = "parents")
|
||||
@Table
|
||||
class Parent: PersistentState() {
|
||||
|
||||
@Cascade(CascadeType.ALL)
|
||||
@OneToMany(targetEntity = Child::class)
|
||||
val children: MutableCollection<Child> = mutableSetOf()
|
||||
|
||||
fun addChild(child: Child) {
|
||||
children.add(child)
|
||||
}
|
||||
}
|
||||
|
||||
@Entity(name = "children")
|
||||
class Child(
|
||||
@Id
|
||||
// Do not change this: this generation type is required in order to trigger the proper cascade ordering.
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
val identifier: Int?,
|
||||
|
||||
val member: AbstractParty?
|
||||
) {
|
||||
constructor(member: AbstractParty): this(null, member)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class SimpleContract: Contract {
|
||||
|
||||
@BelongsToContract(SimpleContract::class)
|
||||
@CordaSerializable
|
||||
data class ParentState(val children: List<ChildState>): ContractState, QueryableState {
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(PersistenceSchema)
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return when(schema) {
|
||||
is PersistenceSchema -> {
|
||||
val parent = PersistenceSchema.Parent()
|
||||
children.forEach { parent.addChild(PersistenceSchema.Child(it.member)) }
|
||||
parent
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unrecognised schema $schema")
|
||||
}
|
||||
}
|
||||
|
||||
override val participants: List<AbstractParty> = children.map { it.member }
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class ChildState(val member: AbstractParty)
|
||||
|
||||
override fun verify(tx: LedgerTransaction) {}
|
||||
|
||||
class Issue: TypeOnlyCommandData()
|
||||
}
|
||||
|
||||
}
|
@ -139,10 +139,12 @@ open class MockServices private constructor(
|
||||
* Makes database and persistent services appropriate for unit tests which require persistence across the vault, identity service
|
||||
* and key managment service.
|
||||
*
|
||||
* @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, flows and services.
|
||||
* @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code,
|
||||
* flows and services.
|
||||
* @param initialIdentity The first (typically sole) identity the services will represent.
|
||||
* @param moreKeys A list of additional [KeyPair] instances to be used by [MockServices].
|
||||
* @param moreIdentities A list of additional [KeyPair] instances to be used by [MockServices].
|
||||
* @param cacheFactory A custom cache factory to be used by the created [IdentityService]
|
||||
* @return A pair where the first element is the instance of [CordaPersistence] and the second is [MockServices].
|
||||
*/
|
||||
@JvmStatic
|
||||
@ -152,12 +154,13 @@ open class MockServices private constructor(
|
||||
initialIdentity: TestIdentity,
|
||||
networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN),
|
||||
moreKeys: Set<KeyPair>,
|
||||
moreIdentities: Set<PartyAndCertificate>
|
||||
moreIdentities: Set<PartyAndCertificate>,
|
||||
cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory()
|
||||
): Pair<CordaPersistence, MockServices> {
|
||||
val cordappLoader = cordappLoaderForPackages(cordappPackages)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val identityService = PersistentIdentityService(TestingNamedCacheFactory())
|
||||
val identityService = PersistentIdentityService(cacheFactory)
|
||||
val persistence = configureDatabase(
|
||||
hikariProperties = dataSourceProps,
|
||||
databaseConfig = DatabaseConfig(),
|
||||
@ -167,7 +170,7 @@ open class MockServices private constructor(
|
||||
internalSchemas = schemaService.internalSchemas()
|
||||
)
|
||||
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, TestingNamedCacheFactory())
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory)
|
||||
|
||||
// Create a persistent identity service and add all the supplied identities.
|
||||
identityService.apply {
|
||||
|
Loading…
Reference in New Issue
Block a user