From ff298e17e1ae66b892e36a102bf24ce7033fab04 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 2 Aug 2018 10:08:12 +0100 Subject: [PATCH] CORDA-1866 Avoid circular flushing in our hibernate column converters. (#3737) --- .../persistence/DatabaseTransaction.kt | 25 +++++++ .../keys/E2ETestKeyManagementService.kt | 1 + .../node/utilities/AppendOnlyPersistentMap.kt | 11 +-- .../HibernateColumnConverterTests.kt | 67 +++++++++++++++++++ 4 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt index 578479d39a..770477e173 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt @@ -1,6 +1,7 @@ package net.corda.nodeapi.internal.persistence import co.paralleluniverse.strands.Strand +import org.hibernate.BaseSessionEventListener import org.hibernate.Session import org.hibernate.Transaction import rx.subjects.PublishSubject @@ -21,6 +22,9 @@ class DatabaseTransaction( ) { val id: UUID = UUID.randomUUID() + val flushing: Boolean get() = _flushingCount > 0 + private var _flushingCount = 0 + val connection: Connection by lazy(LazyThreadSafetyMode.NONE) { database.dataSource.connection.apply { autoCommit = false @@ -30,6 +34,27 @@ class DatabaseTransaction( private val sessionDelegate = lazy { val session = database.entityManagerFactory.withOptions().connection(connection).openSession() + session.addEventListeners(object : BaseSessionEventListener() { + override fun flushStart() { + _flushingCount++ + super.flushStart() + } + + override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) { + super.flushEnd(numberOfEntities, numberOfCollections) + _flushingCount-- + } + + override fun partialFlushStart() { + _flushingCount++ + super.partialFlushStart() + } + + override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) { + super.partialFlushEnd(numberOfEntities, numberOfCollections) + _flushingCount-- + } + }) hibernateTransaction = session.beginTransaction() session } diff --git a/node/src/main/kotlin/net/corda/node/services/keys/E2ETestKeyManagementService.kt b/node/src/main/kotlin/net/corda/node/services/keys/E2ETestKeyManagementService.kt index 5233041642..c99732a356 100644 --- a/node/src/main/kotlin/net/corda/node/services/keys/E2ETestKeyManagementService.kt +++ b/node/src/main/kotlin/net/corda/node/services/keys/E2ETestKeyManagementService.kt @@ -32,6 +32,7 @@ class E2ETestKeyManagementService(val identityService: IdentityService) : Single private val mutex = ThreadBox(InnerState()) // Accessing this map clones it. override val keys: Set get() = mutex.locked { keys.keys } + val keyPairs: Set get() = mutex.locked { keys.map { KeyPair(it.key, it.value) }.toSet() } override fun start(initialKeyPairs: Set) { mutex.locked { diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 19db9ef18d..1fa769b77f 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -124,11 +124,14 @@ abstract class AppendOnlyPersistentMapBase( protected fun loadValue(key: K): V? { val session = currentDBSession() - // IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed. - // We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session. - session.flush() + val flushing = contextTransaction.flushing + if (!flushing) { + // IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed. + // We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session. + session.flush() + } val result = session.find(persistentEntityClass, toPersistentEntityKey(key)) - return result?.apply { session.detach(result) }?.let(fromPersistentEntity)?.second + return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second } operator fun contains(key: K) = get(key) != null diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt new file mode 100644 index 0000000000..4832474cac --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt @@ -0,0 +1,67 @@ +package net.corda.node.services.persistence + +import net.corda.core.identity.Party +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.`issued by` +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.flows.CashIssueFlow +import net.corda.node.services.identity.PersistentIdentityService +import net.corda.node.services.keys.E2ETestKeyManagementService +import net.corda.testing.core.BOC_NAME +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.StartedMockNode +import org.junit.After +import org.junit.Before +import org.junit.Test +import kotlin.test.assertEquals + +class HibernateColumnConverterTests { + private lateinit var mockNet: MockNetwork + private lateinit var bankOfCordaNode: StartedMockNode + private lateinit var bankOfCorda: Party + private lateinit var notary: Party + + @Before + fun start() { + mockNet = MockNetwork( + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin(), + cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas")) + bankOfCordaNode = mockNet.createPartyNode(BOC_NAME) + bankOfCorda = bankOfCordaNode.info.identityFromX500Name(BOC_NAME) + notary = mockNet.defaultNotaryIdentity + } + + @After + fun cleanUp() { + mockNet.stopNodes() + } + + // AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a + // cache miss was doing a flush. This also checks that loading during flush does actually work. + @Test + fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() { + val expected = 500.DOLLARS + val ref = OpaqueBytes.of(0x01) + + // Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup. + val identityService = PersistentIdentityService() + val originalIdentityService: PersistentIdentityService = bankOfCordaNode.services.identityService as PersistentIdentityService + identityService.database = originalIdentityService.database + identityService.start(originalIdentityService.trustRoot) + val keyService = E2ETestKeyManagementService(identityService) + keyService.start((bankOfCordaNode.services.keyManagementService as E2ETestKeyManagementService).keyPairs) + + // New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc). + val newKeyAndCert = keyService.freshKeyAndCert(bankOfCordaNode.info.legalIdentitiesAndCerts[0], false) + val randomNotary = Party(BOC_NAME, newKeyAndCert.owningKey) + + val future = bankOfCordaNode.startFlow(CashIssueFlow(expected, ref, randomNotary)) + mockNet.runNetwork() + val issueTx = future.getOrThrow().stx + val output = issueTx.tx.outputsOfType().single() + assertEquals(expected.`issued by`(bankOfCorda.ref(ref)), output.amount) + } +}