From 9a2ae8ae19369521c894337d6b9a16b2bdb8d7f4 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Tue, 28 Apr 2020 11:20:00 +0100 Subject: [PATCH] CORDA-3722 withEntityManager can rollback its session (#6187) * CORDA-3722 withEntityManager can rollback its session ## Summary Improve the handling of database transactions when using `withEntityManager` inside a flow. Extra changes have been included to improve the safety and correctness of Corda around handling database transactions. This focuses on allowing flows to catch errors that occur inside an entity manager and handle them accordingly. Errors can be caught in two places: - Inside `withEntityManager` - Outside `withEntityManager` Further changes have been included to ensure that transactions are rolled back correctly. ## Catching errors inside `withEntityManager` Errors caught inside `withEntityManager` require the flow to manually `flush` the current session (the entity manager's individual session). By manually flushing the session, a `try-catch` block can be placed around the `flush` call, allowing possible exceptions to be caught. Once an error is thrown from a call to `flush`, it is no longer possible to use the same entity manager to trigger any database operations. The only possible option is to rollback the changes from that session. The flow can continue executing updates within the same session but they will never be committed. What happens in this situation should be handled by the flow. Explicitly restricting the scenario requires a lot of effort and code. Instead, we should rely on the developer to control complex workflows. To continue updating the database after an error like this occurs, a new `withEntityManager` block should be used (after catching the previous error). ## Catching errors outside `withEntityManager` Exceptions can be caught around `withEntityManager` blocks. This allows errors to be handled in the same way as stated above, except the need to manually `flush` the session is removed. `withEntityManager` will automatically `flush` a session if it has not been marked for rollback due to an earlier error. A `try-catch` can then be placed around the whole of the `withEntityManager` block, allowing the error to be caught while not committing any changes to the underlying database transaction. ## Savepoints / Transactionality To make `withEntityManager` blocks work like mini database transactions, save points have been utilised. A new savepoint is created when opening a `withEntityManager` block (along with a new session). It is then used as a reference point to rollback to if the session errors and needs to roll back. The savepoint is then released (independently from completing successfully or failing). Using save points means, that either all the statements inside the entity manager are executed, or none of them are. ## Some implementation details - A new session is created every time an entity manager is requested, but this does not replace the flow's main underlying database session. - `CordaPersistence.transaction` can now determine whether it needs to execute its extra error handling code. This is needed to allow errors escape `withEntityManager` blocks while allowing some of our exception handling around subscribers (in `NodeVaultService`) to continue to work. --- .../kotlin/net/corda/core/node/ServiceHub.kt | 33 +- .../internal/persistence/CordaPersistence.kt | 31 +- .../persistence/DatabaseTransaction.kt | 18 +- .../persistence/RestrictedEntityManager.kt | 28 +- ...Test.kt => RestrictedEntityManagerTest.kt} | 14 +- .../flows/AbstractFlowEntityManagerTest.kt | 134 +++ .../node/flows/FlowEntityManagerNestedTest.kt | 374 +++++++++ .../flows/FlowEntityManagerStatementTest.kt | 309 +++++++ .../corda/node/flows/FlowEntityManagerTest.kt | 763 ++++++++++++++++++ .../net/corda/node/internal/AbstractNode.kt | 48 +- .../statemachine/ActionExecutorImpl.kt | 5 +- .../statemachine/TransitionExecutorImpl.kt | 5 +- .../AppendOnlyPersistentMapTest.kt | 3 + .../node/services/vault/VaultFlowTest.kt | 9 +- .../net/corda/testing/node/MockServices.kt | 4 +- 15 files changed, 1741 insertions(+), 37 deletions(-) rename node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/{RestrtictedEntityManagerTest.kt => RestrictedEntityManagerTest.kt} (76%) create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/AbstractFlowEntityManagerTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerNestedTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerStatementTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index bc25183d4d..d63b63edf4 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -381,7 +381,7 @@ interface ServiceHub : ServicesForResolution { * and thus queryable data will include everything committed as of the last checkpoint. * * We want to make sure users have a restricted access to administrative functions, this function will return a [RestrictedConnection] instance. - * The blocked methods are the following: + * The following methods are blocked: * - abort(executor: Executor?) * - clearWarnings() * - close() @@ -415,15 +415,22 @@ interface ServiceHub : ServicesForResolution { * @param block a lambda function with access to an [EntityManager]. * * We want to make sure users have a restricted access to administrative functions. - * The blocked methods are the following: + * The following methods are blocked: * - close() - * - clear() + * - unwrap(cls: Class?) + * - getDelegate(): Any * - getMetamodel() - * - getTransaction() * - joinTransaction() * - lock(entity: Any?, lockMode: LockModeType?) * - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap?) * - setProperty(propertyName: String?, value: Any?) + * + * getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying + * database transaction. + * The following methods are blocked: + * - begin() + * - commit() + * - rollback() */ fun withEntityManager(block: EntityManager.() -> T): T @@ -435,6 +442,24 @@ interface ServiceHub : ServicesForResolution { * NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda. * * @param block a lambda function with access to an [EntityManager]. + * + * We want to make sure users have a restricted access to administrative functions. + * The following methods are blocked: + * - close() + * - unwrap(cls: Class?) + * - getDelegate(): Any + * - getMetamodel() + * - joinTransaction() + * - lock(entity: Any?, lockMode: LockModeType?) + * - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap?) + * - setProperty(propertyName: String?, value: Any?) + * + * getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying + * database transaction. + * The following methods are blocked: + * - begin() + * - commit() + * - rollback() */ fun withEntityManager(block: Consumer) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 69c16955e3..fa888c009d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -211,14 +211,15 @@ class CordaPersistence( * @param isolationLevel isolation level for the transaction. * @param statement to be executed in the scope of this transaction. */ - fun transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T = - transaction(isolationLevel, 2, false, statement) + fun transaction(isolationLevel: TransactionIsolationLevel, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T = + transaction(isolationLevel, 2, false, useErrorHandler, statement) /** * Executes given statement in the scope of transaction with the transaction level specified at the creation time. * @param statement to be executed in the scope of this transaction. */ - fun transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement) + @JvmOverloads + fun transaction(useErrorHandler: Boolean = true, statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, useErrorHandler, statement) /** * Executes given statement in the scope of transaction, with the given isolation level. @@ -228,7 +229,7 @@ class CordaPersistence( * @param statement to be executed in the scope of this transaction. */ fun transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, - recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T { + recoverAnyNestedSQLException: Boolean, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T { _contextDatabase.set(this) val outer = contextTransactionOrNull return if (outer != null) { @@ -237,26 +238,34 @@ class CordaPersistence( // previously been created by the flow state machine in ActionExecutorImpl#executeCreateTransaction // b. exceptions coming out from top level transactions are already being handled in CordaPersistence#inTopLevelTransaction // i.e. roll back and close the transaction - try { + if(useErrorHandler) { + outer.withErrorHandler(statement) + } else { outer.statement() - } catch (e: Exception) { - if (e is SQLException || e is PersistenceException || e is HospitalizeFlowException) { - outer.errorHandler(e) - } - throw e } } else { inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement) } } + private fun DatabaseTransaction.withErrorHandler(statement: DatabaseTransaction.() -> T): T { + return try { + statement() + } catch (e: Exception) { + if ((e is SQLException || e is PersistenceException || e is HospitalizeFlowException)) { + errorHandler(e) + } + throw e + } + } + /** * Executes given statement in the scope of transaction with the transaction level specified at the creation time. * @param statement to be executed in the scope of this transaction. * @param recoverableFailureTolerance number of transaction commit retries for SQL while SQL exception is encountered. */ fun transaction(recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T { - return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, statement) + return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, false, statement) } private fun inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt index d6d1425e33..be7fb1a4d0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt @@ -39,9 +39,13 @@ class DatabaseTransaction( } // Returns a delegate which overrides certain operations that we do not want CorDapp developers to call. - val restrictedEntityManager: RestrictedEntityManager by lazy { - val entityManager = session as EntityManager - RestrictedEntityManager(entityManager) + + val entityManager: EntityManager get() { + // Always retrieve new session ([Session] implements [EntityManager]) + // Note, this does not replace the top level hibernate session + val session = database.entityManagerFactory.withOptions().connection(connection).openSession() + session.beginTransaction() + return session } val session: Session by sessionDelegate @@ -73,6 +77,10 @@ class DatabaseTransaction( throw DatabaseTransactionException(it) } if (sessionDelegate.isInitialized()) { + // The [sessionDelegate] must be initialised otherwise calling [entityManager] will cause an exception + if(session.transaction.rollbackOnly) { + throw RolledBackDatabaseSessionException() + } hibernateTransaction.commit() } connection.commit() @@ -124,4 +132,6 @@ class DatabaseTransaction( /** * Wrapper exception, for any exception registered as [DatabaseTransaction.firstExceptionInDatabaseTransaction]. */ -class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause) \ No newline at end of file +class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause) + +class RolledBackDatabaseSessionException : CordaRuntimeException("Attempted to commit database transaction marked for rollback") \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManager.kt index 8aebe2fb1f..1ea4f2c4fd 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManager.kt @@ -10,11 +10,19 @@ import javax.persistence.metamodel.Metamodel */ class RestrictedEntityManager(private val delegate: EntityManager) : EntityManager by delegate { + override fun getTransaction(): EntityTransaction { + return RestrictedEntityTransaction(delegate.transaction) + } + override fun close() { throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") } - override fun clear() { + override fun unwrap(cls: Class?): T { + throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") + } + + override fun getDelegate(): Any { throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") } @@ -22,10 +30,6 @@ class RestrictedEntityManager(private val delegate: EntityManager) : EntityManag throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") } - override fun getTransaction(): EntityTransaction? { - throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") - } - override fun joinTransaction() { throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") } @@ -41,5 +45,19 @@ class RestrictedEntityManager(private val delegate: EntityManager) : EntityManag override fun setProperty(propertyName: String?, value: Any?) { throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") } +} +class RestrictedEntityTransaction(private val delegate: EntityTransaction) : EntityTransaction by delegate { + + override fun rollback() { + throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") + } + + override fun commit() { + throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") + } + + override fun begin() { + throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.") + } } \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrtictedEntityManagerTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManagerTest.kt similarity index 76% rename from node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrtictedEntityManagerTest.kt rename to node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManagerTest.kt index 79dae1279c..6f53ade01c 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrtictedEntityManagerTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/RestrictedEntityManagerTest.kt @@ -1,12 +1,17 @@ package net.corda.nodeapi.internal.persistence +import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever import org.junit.Test import javax.persistence.EntityManager +import javax.persistence.EntityTransaction import javax.persistence.LockModeType +import kotlin.test.assertTrue -class RestrtictedEntityManagerTest { +class RestrictedEntityManagerTest { private val entitymanager = mock() + private val transaction = mock() private val restrictedEntityManager = RestrictedEntityManager(entitymanager) @Test(expected = UnsupportedOperationException::class, timeout=300_000) @@ -14,7 +19,7 @@ class RestrtictedEntityManagerTest { restrictedEntityManager.close() } - @Test(expected = UnsupportedOperationException::class, timeout=300_000) + @Test(timeout = 300_000) fun testClear() { restrictedEntityManager.clear() } @@ -24,9 +29,10 @@ class RestrtictedEntityManagerTest { restrictedEntityManager.getMetamodel() } - @Test(expected = UnsupportedOperationException::class, timeout=300_000) + @Test(timeout = 300_000) fun testGetTransaction() { - restrictedEntityManager.getTransaction() + whenever(entitymanager.transaction).doReturn(transaction) + assertTrue(restrictedEntityManager.transaction is RestrictedEntityTransaction) } @Test(expected = UnsupportedOperationException::class, timeout=300_000) diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/AbstractFlowEntityManagerTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/AbstractFlowEntityManagerTest.kt new file mode 100644 index 0000000000..46faabf82d --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/AbstractFlowEntityManagerTest.kt @@ -0,0 +1,134 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.schemas.MappedSchema +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.node.services.statemachine.StaffedFlowHospital +import org.junit.Before +import java.util.concurrent.Semaphore +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.Table +import kotlin.test.assertEquals + +abstract class AbstractFlowEntityManagerTest { + + protected companion object { + + const val TABLE_NAME = "entity_manager_custom_table" + + val entityWithIdOne = CustomTableEntity(1, "Dan", "This won't work") + val anotherEntityWithIdOne = CustomTableEntity(1, "Rick", "I'm pretty sure this will work") + val entityWithIdTwo = CustomTableEntity(2, "Ivan", "This will break existing CorDapps") + val entityWithIdThree = CustomTableEntity(3, "Some other guy", "What am I doing here?") + } + + @CordaSerializable + enum class CommitStatus { INTERMEDIATE_COMMIT, NO_INTERMEDIATE_COMMIT } + + @Before + open fun before() { + StaffedFlowHospital.onFlowDischarged.clear() + StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() + StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() + } + + protected inline fun > CordaRPCOps.expectFlowFailureAndAssertCreatedEntities( + crossinline flow: (CommitStatus) -> R, + commitStatus: CommitStatus, + numberOfDischarges: Int, + numberOfExpectedEntities: Int + ): Int { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + val lock = Semaphore(0) + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() } + startFlow(flow, commitStatus) + lock.acquire() + assertEquals( + numberOfDischarges, + counter, + "[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)" + ) + val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size + assertEquals( + numberOfExpectedEntities, + numberOfEntities, + "[$commitStatus] expected $numberOfExpectedEntities to be saved" + ) + startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds) + return numberOfEntities + } + + protected inline fun > CordaRPCOps.expectFlowSuccessAndAssertCreatedEntities( + crossinline flow: (CommitStatus) -> R, + commitStatus: CommitStatus, + numberOfDischarges: Int, + numberOfExpectedEntities: Int + ): Int { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + startFlow(flow, commitStatus).returnValue.getOrThrow(30.seconds) + assertEquals( + numberOfDischarges, + counter, + "[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)" + ) + val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size + assertEquals( + numberOfExpectedEntities, + numberOfEntities, + "[$commitStatus] expected $numberOfExpectedEntities to be saved" + ) + startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds) + return numberOfEntities + } + + @StartableByRPC + class GetCustomEntities : FlowLogic>() { + @Suspendable + override fun call(): List { + return serviceHub.withEntityManager { + val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java) + criteria.select(criteria.from(CustomTableEntity::class.java)) + createQuery(criteria).resultList + } + } + } + + @StartableByRPC + class DeleteCustomEntities : FlowLogic() { + @Suspendable + override fun call() { + serviceHub.withEntityManager { + val delete = criteriaBuilder.createCriteriaDelete(CustomTableEntity::class.java) + delete.from(CustomTableEntity::class.java) + createQuery(delete).executeUpdate() + } + } + } + + @Entity + @Table(name = TABLE_NAME) + @CordaSerializable + data class CustomTableEntity constructor( + @Id + @Column(name = "id", nullable = false) + var id: Int, + @Column(name = "name", nullable = false) + var name: String, + @Column(name = "quote", nullable = false) + var quote: String + ) + + object CustomSchema + + object CustomMappedSchema : MappedSchema(CustomSchema::class.java, 1, listOf(CustomTableEntity::class.java)) +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerNestedTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerNestedTest.kt new file mode 100644 index 0000000000..73930aec6c --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerNestedTest.kt @@ -0,0 +1,374 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.core.utilities.seconds +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.junit.Test +import javax.persistence.PersistenceException +import kotlin.test.assertEquals + +class FlowEntityManagerNestedTest : AbstractFlowEntityManagerTest() { + + @Test(timeout = 300_000) + fun `entity manager inside an entity manager saves all data`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + alice.rpc.startFlow(::EntityManagerInsideAnEntityManagerFlow).returnValue.getOrThrow(20.seconds) + assertEquals(0, counter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(2, entities.size) + } + } + + @Test(timeout = 300_000) + fun `entity manager inside an entity manager that throws an error does not save any data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager that saves an entity with an entity manager inside it that throws an error after saving the entity does not save any data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager after saving the entity saves the data from the external entity manager`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it inside the entity manager after saving the entity saves the data from the external entity manager`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager before saving the entity saves the data from the external entity manager`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error outside itself after saving the entity does not save the data from the internal entity manager`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error inside itself after saving the entity does not save the data from the internal entity manager`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @StartableByRPC + class EntityManagerInsideAnEntityManagerFlow : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + serviceHub.withEntityManager { + persist(entityWithIdTwo) + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(entityWithIdTwo) + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow( + private val commitStatus: CommitStatus + ) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(entityWithIdTwo) + try { + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow( + private val commitStatus: CommitStatus + ) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(entityWithIdTwo) + serviceHub.withEntityManager { + try { + persist(anotherEntityWithIdOne) + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow( + private val commitStatus: CommitStatus + ) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + try { + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + persist(entityWithIdTwo) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow( + private val commitStatus: CommitStatus + ) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + try { + serviceHub.withEntityManager { + serviceHub.withEntityManager { + persist(entityWithIdTwo) + } + persist(anotherEntityWithIdOne) + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow( + private val commitStatus: CommitStatus + ) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + serviceHub.withEntityManager { + persist(entityWithIdTwo) + } + try { + persist(anotherEntityWithIdOne) + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + sleep(1.millis) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerStatementTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerStatementTest.kt new file mode 100644 index 0000000000..a6e2c8946d --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerStatementTest.kt @@ -0,0 +1,309 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.core.utilities.seconds +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.hibernate.exception.ConstraintViolationException +import org.junit.Test +import javax.persistence.PersistenceException +import kotlin.test.assertEquals + +class FlowEntityManagerStatementTest : AbstractFlowEntityManagerTest() { + + @Test(timeout = 300_000) + fun `data can be saved by a sql statement using entity manager`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + alice.rpc.startFlow(::EntityManagerSqlFlow).returnValue.getOrThrow(20.seconds) + assertEquals(0, counter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(1, entities.size) + } + } + + @Test(timeout = 300_000) + fun `constraint violation caused by a sql statement should save no data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorFromSqlFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorFromSqlFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation caused by a sql statement that is caught inside an entity manager block saves none of the data inside of it`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + // 1 entity saved from the first entity manager block that does not get rolled back + // even if there is no intermediate commit to the database + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation caused by a sql statement that is caught outside an entity manager block saves none of the data inside of it`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + // 1 entity saved from the first entity manager block that does not get rolled back + // even if there is no intermediate commit to the database + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 2 + ) + } + } + + @StartableByRPC + class EntityManagerSqlFlow : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerErrorFromSqlFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + try { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + try { + serviceHub.withEntityManager { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + try { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + // These entities are not saved since the transaction is marked for rollback + try { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", entityWithIdTwo.id) + .setParameter("name", entityWithIdTwo.name) + .setParameter("quote", entityWithIdTwo.name) + .executeUpdate() + } catch (e: PersistenceException) { + if (e.cause is ConstraintViolationException) { + throw e + } else { + logger.info( + """ + Caught exception from second sql statement inside the same broken entity manager + This happens if the database has thrown an exception due to rolling back the db transaction + """.trimIndent(), e + ) + } + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + try { + serviceHub.withEntityManager { + createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", anotherEntityWithIdOne.id) + .setParameter("name", anotherEntityWithIdOne.name) + .setParameter("quote", anotherEntityWithIdOne.name) + .executeUpdate() + + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + serviceHub.withEntityManager { + val query = createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)") + .setParameter("id", entityWithIdTwo.id) + .setParameter("name", entityWithIdTwo.name) + .setParameter("quote", entityWithIdTwo.name) + query.executeUpdate() + } + sleep(1.millis) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt new file mode 100644 index 0000000000..f8c68f0cb6 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt @@ -0,0 +1,763 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.CollectSignaturesFlow +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.SignTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.core.utilities.seconds +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.testing.contracts.DummyState +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DummyCommandData +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.hibernate.exception.ConstraintViolationException +import org.junit.Before +import org.junit.Test +import java.sql.Connection +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Semaphore +import javax.persistence.PersistenceException +import kotlin.test.assertEquals + +class FlowEntityManagerTest : AbstractFlowEntityManagerTest() { + + @Before + override fun before() { + MyService.includeRawUpdates = false + super.before() + } + + @Test(timeout = 300_000) + fun `entities can be saved using entity manager without a flush`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow) + .returnValue.getOrThrow(30.seconds) + assertEquals(0, counter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(3, entities.size) + } + } + + @Test(timeout = 300_000) + fun `entities can be saved using entity manager with a flush`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + alice.rpc.startFlow(::EntityManagerSaveEntitiesWithAFlushFlow) + .returnValue.getOrThrow(30.seconds) + assertEquals(0, counter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(3, entities.size) + } + } + + @Test(timeout = 300_000) + fun `entities saved inside an entity manager are only committed when a flow suspends`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + var beforeCommitEntities: List? = null + EntityManagerSaveEntitiesWithoutAFlushFlow.beforeCommitHook = { + beforeCommitEntities = it + } + var afterCommitEntities: List? = null + EntityManagerSaveEntitiesWithoutAFlushFlow.afterCommitHook = { + afterCommitEntities = it + } + + alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow) + .returnValue.getOrThrow(30.seconds) + assertEquals(0, counter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(3, entities.size) + assertEquals(0, beforeCommitEntities!!.size) + assertEquals(3, afterCommitEntities!!.size) + } + } + + @Test(timeout = 300_000) + fun `constraint violation without a flush breaks`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorWithoutAFlushFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorWithoutAFlushFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation with a flush breaks`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorWithAFlushFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerErrorWithAFlushFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation with a flush that is caught inside an entity manager block saves none of the data inside of it`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + // 1 entity saved from the first entity manager block that does not get rolled back + // even if there is no intermediate commit to the database + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation with a flush that is caught outside the entity manager block saves none of the data inside of it`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + // 1 entity saved from the first entity manager block that does not get rolled back + // even if there is no intermediate commit to the database + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation within a single entity manager block throws an exception and saves no data`() { + var dischargeCounter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter } + val lock = Semaphore(0) + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() } + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.startFlow(::EntityManagerErrorInsideASingleEntityManagerFlow) + lock.acquire() + // Goes straight to observation due to throwing [EntityExistsException] + assertEquals(0, dischargeCounter) + val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow() + assertEquals(0, entities.size) + } + } + + @Test(timeout = 300_000) + fun `constraint violation on a single entity when saving multiple entities throws an exception and does not save any data within the entity manager block`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 0 + ) + alice.rpc.expectFlowFailureAndAssertCreatedEntities( + flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 3, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation on a single entity when saving multiple entities and catching the error does not save any data within the entity manager block`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + // 1 entity saved from the first entity manager block that does not get rolled back + // even if there is no intermediate commit to the database + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 3 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 3 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 1 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation that is caught outside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager, + commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 3 + ) + alice.rpc.expectFlowSuccessAndAssertCreatedEntities( + flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager, + commitStatus = CommitStatus.INTERMEDIATE_COMMIT, + numberOfDischarges = 0, + numberOfExpectedEntities = 3 + ) + } + } + + @Test(timeout = 300_000) + fun `constraint violation that is caught inside an entity manager should allow a flow to continue processing as normal`() { + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + val bob = startNode(providedName = BOB_NAME).getOrThrow() + + val txId = + alice.rpc.startFlow(::EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow, bob.nodeInfo.singleIdentity()) + .returnValue.getOrThrow(20.seconds) + assertEquals(0, counter) + val txFromVault = alice.rpc.stateMachineRecordedTransactionMappingSnapshot().firstOrNull()?.transactionId + assertEquals(txId, txFromVault) + val entity = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow().single() + assertEquals(entityWithIdOne, entity) + } + } + + @Test(timeout = 300_000) + fun `data saved from an entity manager vault update should be visible within an entity manager block inside the same database transaction`() { + MyService.includeRawUpdates = true + MyService.insertionType = MyService.InsertionType.ENTITY_MANAGER + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + val entities = + alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds) + assertEquals(3, entities.size) + assertEquals(0, counter) + } + } + + @Test(timeout = 300_000) + fun `data saved from a jdbc connection vault update should be visible within an entity manager block inside the same database transaction`() { + MyService.includeRawUpdates = true + MyService.insertionType = MyService.InsertionType.CONNECTION + var counter = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } + driver(DriverParameters(startNodesInProcess = true)) { + + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + + val entities = + alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds) + assertEquals(3, entities.size) + assertEquals(0, counter) + } + } + + @StartableByRPC + class EntityManagerSaveEntitiesWithoutAFlushFlow : FlowLogic() { + + companion object { + var beforeCommitHook: ((entities: List) -> Unit)? = null + var afterCommitHook: ((entities: List) -> Unit)? = null + } + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + beforeCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities()) + sleep(1.millis) + afterCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities()) + } + } + + @StartableByRPC + class EntityManagerSaveEntitiesWithAFlushFlow : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + flush() + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerErrorWithoutAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerErrorWithAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + flush() + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + try { + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + try { + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + flush() + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerErrorInsideASingleEntityManagerFlow : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + persist(anotherEntityWithIdOne) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerSavingMultipleEntitiesWithASingleErrorFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + try { + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + try { + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager(private val commitStatus: CommitStatus) : FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + try { + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + // These entities are not saved since the transaction is marked for rollback + try { + persist(entityWithIdTwo) + persist(entityWithIdThree) + } catch (e: PersistenceException) { + if (e.cause is ConstraintViolationException) { + throw e + } else { + logger.info( + """ + Caught exception from second set of persists inside the same broken entity manager + This happens if the database has thrown an exception due to rolling back the db transaction + """.trimIndent(), e + ) + } + } + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) : + FlowLogic() { + + @Suspendable + override fun call() { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + try { + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + } + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) { + sleep(1.millis) + } + serviceHub.withEntityManager { + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + sleep(1.millis) + } + } + + @StartableByRPC + class EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow(private val party: Party) : FlowLogic() { + + @Suspendable + override fun call(): SecureHash { + serviceHub.withEntityManager { + persist(entityWithIdOne) + } + serviceHub.withEntityManager { + persist(anotherEntityWithIdOne) + try { + flush() + } catch (e: PersistenceException) { + logger.info("Caught the exception!") + } + } + return subFlow(CreateATransactionFlow(party)) + } + } + + @InitiatingFlow + class CreateATransactionFlow(val party: Party) : FlowLogic() { + @Suspendable + override fun call(): SecureHash { + val session = initiateFlow(party) + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addOutputState(DummyState(participants = listOf(ourIdentity, party))) + addCommand(DummyCommandData, ourIdentity.owningKey, party.owningKey) + } + val stx = serviceHub.signInitialTransaction(tx) + val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session))) + return subFlow(FinalityFlow(ftx, session)).id + } + } + + @InitiatedBy(CreateATransactionFlow::class) + class CreateATransactionResponder(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val stx = subFlow(object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) { + } + }) + subFlow(ReceiveFinalityFlow(session, stx.id)) + } + } + + @StartableByRPC + class EntityManagerWithinTheSameDatabaseTransactionFlow : FlowLogic>() { + + @Suspendable + override fun call(): List { + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addOutputState(DummyState(participants = listOf(ourIdentity))) + addCommand(DummyCommandData, ourIdentity.owningKey) + } + val stx = serviceHub.signInitialTransaction(tx) + serviceHub.recordTransactions(stx) + return serviceHub.withEntityManager { + val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java) + criteria.select(criteria.from(CustomTableEntity::class.java)) + createQuery(criteria).resultList + } + } + } + + @CordaService + class MyService(private val services: AppServiceHub) : SingletonSerializeAsToken() { + + companion object { + var includeRawUpdates = false + var insertionType = InsertionType.ENTITY_MANAGER + } + + enum class InsertionType { ENTITY_MANAGER, CONNECTION } + + val executors: ExecutorService = Executors.newFixedThreadPool(1) + + init { + if (includeRawUpdates) { + services.register { + services.vaultService.rawUpdates.subscribe { + if (insertionType == InsertionType.ENTITY_MANAGER) { + services.withEntityManager { + persist(entityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + } else { + services.jdbcSession().run { + insert(entityWithIdOne) + insert(entityWithIdTwo) + insert(entityWithIdThree) + } + } + } + } + } + } + + private fun Connection.insert(entity: CustomTableEntity) { + prepareStatement("INSERT INTO $TABLE_NAME VALUES (?, ?, ?)").apply { + setInt(1, entity.id) + setString(2, entity.name) + setString(3, entity.quote) + }.executeUpdate() + } + + fun getEntities(): List { + return executors.submit> { + services.database.transaction { + session.run { + val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java) + criteria.select(criteria.from(CustomTableEntity::class.java)) + createQuery(criteria).resultList + } + } + }.get() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ed6bf6e5eb..afaf40c36e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -160,8 +160,10 @@ import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesException import net.corda.nodeapi.internal.persistence.RestrictedConnection +import net.corda.nodeapi.internal.persistence.RestrictedEntityManager import net.corda.nodeapi.internal.persistence.SchemaMigration import net.corda.tools.shell.InteractiveShell import org.apache.activemq.artemis.utils.ReusableLatch @@ -176,6 +178,7 @@ import java.security.KeyPair import java.security.KeyStoreException import java.security.cert.X509Certificate import java.sql.Connection +import java.sql.Savepoint import java.time.Clock import java.time.Duration import java.time.format.DateTimeParseException @@ -189,6 +192,7 @@ import java.util.concurrent.TimeUnit.MINUTES import java.util.concurrent.TimeUnit.SECONDS import java.util.function.Consumer import javax.persistence.EntityManager +import javax.persistence.PersistenceException import kotlin.collections.ArrayList /** @@ -1192,12 +1196,52 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override fun jdbcSession(): Connection = RestrictedConnection(database.createSession()) override fun withEntityManager(block: EntityManager.() -> T): T { - return database.transaction { + return database.transaction(useErrorHandler = false) { session.flush() - block(restrictedEntityManager) + val manager = entityManager + withSavePoint { savepoint -> + // Restrict what entity manager they can use inside the block + try { + block(RestrictedEntityManager(manager)).also { + if (!manager.transaction.rollbackOnly) { + manager.flush() + } else { + connection.rollback(savepoint) + } + } + } catch (e: PersistenceException) { + if (manager.transaction.rollbackOnly) { + connection.rollback(savepoint) + } + throw e + } finally { + manager.close() + } + } } } + private fun DatabaseTransaction.withSavePoint(block: (savepoint: Savepoint) -> T) : T { + val savepoint = connection.setSavepoint() + return try { + block(savepoint) + } finally { + // Release the save point even if we occur an error + if (savepoint.supportsReleasing()) { + connection.releaseSavepoint(savepoint) + } + } + } + + /** + * Not all databases support releasing of savepoints. + * The type of savepoints are referenced by string names since we do not have access to the JDBC drivers + * at compile time. + */ + private fun Savepoint.supportsReleasing(): Boolean { + return this::class.simpleName != "SQLServerSavepoint" && this::class.simpleName != "OracleSavepoint" + } + override fun withEntityManager(block: Consumer) { withEntityManager { block.accept(this) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 882f961876..8489eecb33 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -229,7 +229,10 @@ class ActionExecutorImpl( @Suspendable private fun executeRollbackTransaction() { - contextTransactionOrNull?.close() + contextTransactionOrNull?.run { + rollback() + close() + } } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index 07aa887112..e5ca0d3f84 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -43,7 +43,10 @@ class TransitionExecutorImpl( try { actionExecutor.executeAction(fiber, action) } catch (exception: Exception) { - contextTransactionOrNull?.close() + contextTransactionOrNull?.run { + rollback() + close() + } if (transition.newState.checkpoint.errorState is ErrorState.Errored) { // If we errored while transitioning to an error state then we cannot record the additional // error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again. diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt index acfefd9c8e..b596de6757 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt @@ -5,6 +5,7 @@ import net.corda.core.utilities.loggerFor import net.corda.node.services.schema.NodeSchemaService import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.nodeapi.internal.persistence.RolledBackDatabaseSessionException import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -180,6 +181,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) { } catch (t: PersistenceException) { // This only helps if thrown on commit, otherwise other latches not counted down. assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome) + } catch (t: RolledBackDatabaseSessionException) { + assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome) } a.await(a::phase4) b.await(b::phase4) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt index 361d8f296e..993731e15b 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt @@ -1,7 +1,12 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable -import net.corda.core.flows.* +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.queryBy @@ -50,13 +55,11 @@ class VaultFlowTest { @After fun tearDown() { mockNetwork.stopNodes() - StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() } @Test(timeout=300_000) fun `Unique column constraint failing causes states to not persist to vaults`() { - StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException }) partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get() val hospitalLatch = CountDownLatch(1) StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> hospitalLatch.countDown() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index ab767c3ff2..b259a2aa1d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -236,11 +236,11 @@ open class MockServices private constructor( override fun jdbcSession(): Connection = persistence.createSession() override fun withEntityManager(block: EntityManager.() -> T): T { - return block(contextTransaction.restrictedEntityManager) + return block(contextTransaction.entityManager) } override fun withEntityManager(block: Consumer) { - return block.accept(contextTransaction.restrictedEntityManager) + return block.accept(contextTransaction.entityManager) } } }