CORDA-1866 Avoid circular flushing in our hibernate column converters. (#3737)

This commit is contained in:
Rick Parker 2018-08-02 10:08:12 +01:00 committed by GitHub
parent 055ba90e0d
commit ff298e17e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.persistence
import co.paralleluniverse.strands.Strand
import org.hibernate.BaseSessionEventListener
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
@ -21,6 +22,9 @@ class DatabaseTransaction(
) {
val id: UUID = UUID.randomUUID()
val flushing: Boolean get() = _flushingCount > 0
private var _flushingCount = 0
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
database.dataSource.connection.apply {
autoCommit = false
@ -30,6 +34,27 @@ class DatabaseTransaction(
private val sessionDelegate = lazy {
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
session.addEventListeners(object : BaseSessionEventListener() {
override fun flushStart() {
_flushingCount++
super.flushStart()
}
override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) {
super.flushEnd(numberOfEntities, numberOfCollections)
_flushingCount--
}
override fun partialFlushStart() {
_flushingCount++
super.partialFlushStart()
}
override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) {
super.partialFlushEnd(numberOfEntities, numberOfCollections)
_flushingCount--
}
})
hibernateTransaction = session.beginTransaction()
session
}

View File

@ -32,6 +32,7 @@ class E2ETestKeyManagementService(val identityService: IdentityService) : Single
private val mutex = ThreadBox(InnerState())
// Accessing this map clones it.
override val keys: Set<PublicKey> get() = mutex.locked { keys.keys }
val keyPairs: Set<KeyPair> get() = mutex.locked { keys.map { KeyPair(it.key, it.value) }.toSet() }
override fun start(initialKeyPairs: Set<KeyPair>) {
mutex.locked {

View File

@ -124,11 +124,14 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
protected fun loadValue(key: K): V? {
val session = currentDBSession()
// IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed.
// We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session.
session.flush()
val flushing = contextTransaction.flushing
if (!flushing) {
// IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed.
// We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session.
session.flush()
}
val result = session.find(persistentEntityClass, toPersistentEntityKey(key))
return result?.apply { session.detach(result) }?.let(fromPersistentEntity)?.second
return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second
}
operator fun contains(key: K) = get(key) != null

View File

@ -0,0 +1,67 @@
package net.corda.node.services.persistence
import net.corda.core.identity.Party
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.`issued by`
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.testing.core.BOC_NAME
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.StartedMockNode
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
class HibernateColumnConverterTests {
private lateinit var mockNet: MockNetwork
private lateinit var bankOfCordaNode: StartedMockNode
private lateinit var bankOfCorda: Party
private lateinit var notary: Party
@Before
fun start() {
mockNet = MockNetwork(
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin(),
cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas"))
bankOfCordaNode = mockNet.createPartyNode(BOC_NAME)
bankOfCorda = bankOfCordaNode.info.identityFromX500Name(BOC_NAME)
notary = mockNet.defaultNotaryIdentity
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a
// cache miss was doing a flush. This also checks that loading during flush does actually work.
@Test
fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() {
val expected = 500.DOLLARS
val ref = OpaqueBytes.of(0x01)
// Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup.
val identityService = PersistentIdentityService()
val originalIdentityService: PersistentIdentityService = bankOfCordaNode.services.identityService as PersistentIdentityService
identityService.database = originalIdentityService.database
identityService.start(originalIdentityService.trustRoot)
val keyService = E2ETestKeyManagementService(identityService)
keyService.start((bankOfCordaNode.services.keyManagementService as E2ETestKeyManagementService).keyPairs)
// New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc).
val newKeyAndCert = keyService.freshKeyAndCert(bankOfCordaNode.info.legalIdentitiesAndCerts[0], false)
val randomNotary = Party(BOC_NAME, newKeyAndCert.owningKey)
val future = bankOfCordaNode.startFlow(CashIssueFlow(expected, ref, randomNotary))
mockNet.runNetwork()
val issueTx = future.getOrThrow().stx
val output = issueTx.tx.outputsOfType<Cash.State>().single()
assertEquals(expected.`issued by`(bankOfCorda.ref(ref)), output.amount)
}
}