CORDA-3722 withEntityManager can rollback its session (#6187)

* CORDA-3722 withEntityManager can rollback its session

Improve the handling of database transactions when using
`withEntityManager` inside a flow.

Extra changes have been included to improve the safety and
correctness of Corda around handling database transactions.

This focuses on allowing flows to catch errors that occur inside an
entity manager and handle them accordingly.

Errors can be caught in two places:

- Inside `withEntityManager`
- Outside `withEntityManager`

Further changes have been included to ensure that transactions are
rolled back correctly.

Errors caught inside `withEntityManager` require the flow to manually
`flush` the current session (the entity manager's individual session).
By manually flushing the session, a `try-catch` block can be placed
around the `flush` call, allowing possible exceptions to be caught.

Once an error is thrown from a call to `flush`, it is no longer possible
to use the same entity manager to trigger any database operations. The
only possible option is to rollback the changes from that session.
The flow can continue executing updates within the same session but they
will never be committed. What happens in this situation should be handled
by the flow. Explicitly restricting the scenario requires a lot of effort
and code. Instead, we should rely on the developer to control complex
workflows.

To continue updating the database after an error like this occurs, a new
`withEntityManager` block should be used (after catching the previous
error).

Exceptions can be caught around `withEntityManager` blocks. This allows
errors to be handled in the same way as stated above, except the need to
manually `flush` the session is removed. `withEntityManager` will
automatically `flush` a session if it has not been marked for rollback
due to an earlier error.

A `try-catch` can then be placed around the whole of the
`withEntityManager` block, allowing the error to be caught while not
committing any changes to the underlying database transaction.

To make `withEntityManager` blocks work like mini database transactions,
save points have been utilised. A new savepoint is created when opening
a `withEntityManager` block (along with a new session). It is then used
as a reference point to rollback to if the session errors and needs to
roll back. The savepoint is then released (independently from
completing successfully or failing).

Using save points means, that either all the statements inside the
entity manager are executed, or none of them are.

- A new session is created every time an entity manager is requested,
but this does not replace the flow's main underlying database session.
- `CordaPersistence.transaction` can now determine whether it needs
to execute its extra error handling code. This is needed to allow errors
escape `withEntityManager` blocks while allowing some of our exception
handling around subscribers (in `NodeVaultService`) to continue to work.
This commit is contained in:
Dan Newton 2020-04-28 11:20:00 +01:00 committed by LankyDan
parent f6b5737277
commit efd633c7b9
14 changed files with 1832 additions and 25 deletions

View File

@ -380,6 +380,26 @@ interface ServiceHub : ServicesForResolution {
* When used within a flow, this session automatically forms part of the enclosing flow transaction boundary,
* and thus queryable data will include everything committed as of the last checkpoint.
*
* We want to make sure users have a restricted access to administrative functions, this function will return a [RestrictedConnection] instance.
* The blocked methods are the following:
* - abort(executor: Executor?)
* - clearWarnings()
* - close()
* - commit()
* - setSavepoint()
* - setSavepoint(name : String?)
* - releaseSavepoint(savepoint: Savepoint?)
* - rollback()
* - rollback(savepoint: Savepoint?)
* - setCatalog(catalog : String?)
* - setTransactionIsolation(level: Int)
* - setTypeMap(map: MutableMap<String, Class<*>>?)
* - setHoldability(holdability: Int)
* - setSchema(schema: String?)
* - setNetworkTimeout(executor: Executor?, milliseconds: Int)
* - setAutoCommit(autoCommit: Boolean)
* - setReadOnly(readOnly: Boolean)
*
* @throws IllegalStateException if called outside of a transaction.
* @return A [Connection]
*/
@ -393,6 +413,24 @@ interface ServiceHub : ServicesForResolution {
* NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda.
*
* @param block a lambda function with access to an [EntityManager].
*
* We want to make sure users have a restricted access to administrative functions.
* The following methods are blocked:
* - close()
* - unwrap(cls: Class<T>?)
* - getDelegate(): Any
* - getMetamodel()
* - joinTransaction()
* - lock(entity: Any?, lockMode: LockModeType?)
* - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap<String, Any>?)
* - setProperty(propertyName: String?, value: Any?)
*
* getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying
* database transaction.
* The following methods are blocked:
* - begin()
* - commit()
* - rollback()
*/
fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T
@ -404,6 +442,24 @@ interface ServiceHub : ServicesForResolution {
* NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda.
*
* @param block a lambda function with access to an [EntityManager].
*
* We want to make sure users have a restricted access to administrative functions.
* The following methods are blocked:
* - close()
* - unwrap(cls: Class<T>?)
* - getDelegate(): Any
* - getMetamodel()
* - joinTransaction()
* - lock(entity: Any?, lockMode: LockModeType?)
* - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap<String, Any>?)
* - setProperty(propertyName: String?, value: Any?)
*
* getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying
* database transaction.
* The following methods are blocked:
* - begin()
* - commit()
* - rollback()
*/
fun withEntityManager(block: Consumer<EntityManager>)

View File

@ -211,14 +211,15 @@ class CordaPersistence(
* @param isolationLevel isolation level for the transaction.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T =
transaction(isolationLevel, 2, false, statement)
fun <T> transaction(isolationLevel: TransactionIsolationLevel, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T =
transaction(isolationLevel, 2, false, useErrorHandler, statement)
/**
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
@JvmOverloads
fun <T> transaction(useErrorHandler: Boolean = true, statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, useErrorHandler, statement)
/**
* Executes given statement in the scope of transaction, with the given isolation level.
@ -228,7 +229,7 @@ class CordaPersistence(
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int,
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
recoverAnyNestedSQLException: Boolean, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T {
_contextDatabase.set(this)
val outer = contextTransactionOrNull
return if (outer != null) {
@ -237,26 +238,34 @@ class CordaPersistence(
// previously been created by the flow state machine in ActionExecutorImpl#executeCreateTransaction
// b. exceptions coming out from top level transactions are already being handled in CordaPersistence#inTopLevelTransaction
// i.e. roll back and close the transaction
try {
if(useErrorHandler) {
outer.withErrorHandler(statement)
} else {
outer.statement()
} catch (e: Exception) {
if (e is SQLException || e is PersistenceException || e is HospitalizeFlowException) {
outer.errorHandler(e)
}
throw e
}
} else {
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
}
}
private fun <T> DatabaseTransaction.withErrorHandler(statement: DatabaseTransaction.() -> T): T {
return try {
statement()
} catch (e: Exception) {
if ((e is SQLException || e is PersistenceException || e is HospitalizeFlowException)) {
errorHandler(e)
}
throw e
}
}
/**
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
* @param statement to be executed in the scope of this transaction.
* @param recoverableFailureTolerance number of transaction commit retries for SQL while SQL exception is encountered.
*/
fun <T> transaction(recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, statement)
return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, false, statement)
}
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int,

View File

@ -40,9 +40,13 @@ class DatabaseTransaction(
}
// Returns a delegate which overrides certain operations that we do not want CorDapp developers to call.
val restrictedEntityManager: RestrictedEntityManager by lazy {
val entityManager = session as EntityManager
RestrictedEntityManager(entityManager)
val entityManager: EntityManager get() {
// Always retrieve new session ([Session] implements [EntityManager])
// Note, this does not replace the top level hibernate session
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
session.beginTransaction()
return session
}
val session: Session by sessionDelegate
@ -74,6 +78,10 @@ class DatabaseTransaction(
throw DatabaseTransactionException(it)
}
if (sessionDelegate.isInitialized()) {
// The [sessionDelegate] must be initialised otherwise calling [entityManager] will cause an exception
if(session.transaction.rollbackOnly) {
throw RolledBackDatabaseSessionException()
}
hibernateTransaction.commit()
}
connection.commit()
@ -130,4 +138,6 @@ class DatabaseTransaction(
/**
* Wrapper exception, for any exception registered as [DatabaseTransaction.firstExceptionInDatabaseTransaction].
*/
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
class RolledBackDatabaseSessionException : CordaRuntimeException("Attempted to commit database transaction marked for rollback")

View File

@ -7,13 +7,54 @@ import javax.persistence.EntityManager
*/
class RestrictedEntityManager(private val delegate: EntityManager) : EntityManager by delegate {
override fun getTransaction(): EntityTransaction {
return RestrictedEntityTransaction(delegate.transaction)
}
override fun close() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun clear() {
override fun <T : Any?> unwrap(cls: Class<T>?): T {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
// TODO: Figure out which other methods on EntityManager need to be blocked?
override fun getDelegate(): Any {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun getMetamodel(): Metamodel? {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun joinTransaction() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun lock(entity: Any?, lockMode: LockModeType?) {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap<String, Any>?) {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun setProperty(propertyName: String?, value: Any?) {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
}
class RestrictedEntityTransaction(private val delegate: EntityTransaction) : EntityTransaction by delegate {
override fun rollback() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun commit() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun begin() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
}

View File

@ -0,0 +1,58 @@
package net.corda.nodeapi.internal.persistence
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import org.junit.Test
import javax.persistence.EntityManager
import javax.persistence.EntityTransaction
import javax.persistence.LockModeType
import kotlin.test.assertTrue
class RestrictedEntityManagerTest {
private val entitymanager = mock<EntityManager>()
private val transaction = mock<EntityTransaction>()
private val restrictedEntityManager = RestrictedEntityManager(entitymanager)
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testClose() {
restrictedEntityManager.close()
}
@Test(timeout = 300_000)
fun testClear() {
restrictedEntityManager.clear()
}
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testGetMetaModel() {
restrictedEntityManager.getMetamodel()
}
@Test(timeout = 300_000)
fun testGetTransaction() {
whenever(entitymanager.transaction).doReturn(transaction)
assertTrue(restrictedEntityManager.transaction is RestrictedEntityTransaction)
}
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testJoinTransaction() {
restrictedEntityManager.joinTransaction()
}
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testLockWithTwoParameters() {
restrictedEntityManager.lock(Object(), LockModeType.OPTIMISTIC)
}
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testLockWithThreeParameters() {
val map: MutableMap<String,Any> = mutableMapOf()
restrictedEntityManager.lock(Object(), LockModeType.OPTIMISTIC,map)
}
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
fun testSetProperty() {
restrictedEntityManager.setProperty("number", 12)
}
}

View File

@ -0,0 +1,134 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.StaffedFlowHospital
import org.junit.Before
import java.util.concurrent.Semaphore
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
import kotlin.test.assertEquals
abstract class AbstractFlowEntityManagerTest {
protected companion object {
const val TABLE_NAME = "entity_manager_custom_table"
val entityWithIdOne = CustomTableEntity(1, "Dan", "This won't work")
val anotherEntityWithIdOne = CustomTableEntity(1, "Rick", "I'm pretty sure this will work")
val entityWithIdTwo = CustomTableEntity(2, "Ivan", "This will break existing CorDapps")
val entityWithIdThree = CustomTableEntity(3, "Some other guy", "What am I doing here?")
}
@CordaSerializable
enum class CommitStatus { INTERMEDIATE_COMMIT, NO_INTERMEDIATE_COMMIT }
@Before
open fun before() {
StaffedFlowHospital.onFlowDischarged.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
}
protected inline fun <reified R : FlowLogic<Any>> CordaRPCOps.expectFlowFailureAndAssertCreatedEntities(
crossinline flow: (CommitStatus) -> R,
commitStatus: CommitStatus,
numberOfDischarges: Int,
numberOfExpectedEntities: Int
): Int {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
val lock = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() }
startFlow(flow, commitStatus)
lock.acquire()
assertEquals(
numberOfDischarges,
counter,
"[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)"
)
val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size
assertEquals(
numberOfExpectedEntities,
numberOfEntities,
"[$commitStatus] expected $numberOfExpectedEntities to be saved"
)
startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds)
return numberOfEntities
}
protected inline fun <reified R : FlowLogic<Any>> CordaRPCOps.expectFlowSuccessAndAssertCreatedEntities(
crossinline flow: (CommitStatus) -> R,
commitStatus: CommitStatus,
numberOfDischarges: Int,
numberOfExpectedEntities: Int
): Int {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
startFlow(flow, commitStatus).returnValue.getOrThrow(30.seconds)
assertEquals(
numberOfDischarges,
counter,
"[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)"
)
val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size
assertEquals(
numberOfExpectedEntities,
numberOfEntities,
"[$commitStatus] expected $numberOfExpectedEntities to be saved"
)
startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds)
return numberOfEntities
}
@StartableByRPC
class GetCustomEntities : FlowLogic<List<CustomTableEntity>>() {
@Suspendable
override fun call(): List<CustomTableEntity> {
return serviceHub.withEntityManager {
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
criteria.select(criteria.from(CustomTableEntity::class.java))
createQuery(criteria).resultList
}
}
}
@StartableByRPC
class DeleteCustomEntities : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
val delete = criteriaBuilder.createCriteriaDelete(CustomTableEntity::class.java)
delete.from(CustomTableEntity::class.java)
createQuery(delete).executeUpdate()
}
}
}
@Entity
@Table(name = TABLE_NAME)
@CordaSerializable
data class CustomTableEntity constructor(
@Id
@Column(name = "id", nullable = false)
var id: Int,
@Column(name = "name", nullable = false)
var name: String,
@Column(name = "quote", nullable = false)
var quote: String
)
object CustomSchema
object CustomMappedSchema : MappedSchema(CustomSchema::class.java, 1, listOf(CustomTableEntity::class.java))
}

View File

@ -0,0 +1,374 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Test
import javax.persistence.PersistenceException
import kotlin.test.assertEquals
class FlowEntityManagerNestedTest : AbstractFlowEntityManagerTest() {
@Test(timeout = 300_000)
fun `entity manager inside an entity manager saves all data`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.startFlow(::EntityManagerInsideAnEntityManagerFlow).returnValue.getOrThrow(20.seconds)
assertEquals(0, counter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(2, entities.size)
}
}
@Test(timeout = 300_000)
fun `entity manager inside an entity manager that throws an error does not save any data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `entity manager that saves an entity with an entity manager inside it that throws an error after saving the entity does not save any data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager after saving the entity saves the data from the external entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
}
}
@Test(timeout = 300_000)
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it inside the entity manager after saving the entity saves the data from the external entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
}
}
@Test(timeout = 300_000)
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager before saving the entity saves the data from the external entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
}
}
@Test(timeout = 300_000)
fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error outside itself after saving the entity does not save the data from the internal entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error inside itself after saving the entity does not save the data from the internal entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@StartableByRPC
class EntityManagerInsideAnEntityManagerFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
serviceHub.withEntityManager {
persist(entityWithIdTwo)
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(entityWithIdTwo)
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow(
private val commitStatus: CommitStatus
) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(entityWithIdTwo)
try {
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow(
private val commitStatus: CommitStatus
) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(entityWithIdTwo)
serviceHub.withEntityManager {
try {
persist(anotherEntityWithIdOne)
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow(
private val commitStatus: CommitStatus
) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
try {
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
persist(entityWithIdTwo)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow(
private val commitStatus: CommitStatus
) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
try {
serviceHub.withEntityManager {
serviceHub.withEntityManager {
persist(entityWithIdTwo)
}
persist(anotherEntityWithIdOne)
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow(
private val commitStatus: CommitStatus
) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
serviceHub.withEntityManager {
persist(entityWithIdTwo)
}
try {
persist(anotherEntityWithIdOne)
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
sleep(1.millis)
}
}
}

View File

@ -0,0 +1,309 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.hibernate.exception.ConstraintViolationException
import org.junit.Test
import javax.persistence.PersistenceException
import kotlin.test.assertEquals
class FlowEntityManagerStatementTest : AbstractFlowEntityManagerTest() {
@Test(timeout = 300_000)
fun `data can be saved by a sql statement using entity manager`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.startFlow(::EntityManagerSqlFlow).returnValue.getOrThrow(20.seconds)
assertEquals(0, counter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(1, entities.size)
}
}
@Test(timeout = 300_000)
fun `constraint violation caused by a sql statement should save no data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorFromSqlFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorFromSqlFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation caused by a sql statement that is caught inside an entity manager block saves none of the data inside of it`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
// 1 entity saved from the first entity manager block that does not get rolled back
// even if there is no intermediate commit to the database
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation caused by a sql statement that is caught outside an entity manager block saves none of the data inside of it`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
// 1 entity saved from the first entity manager block that does not get rolled back
// even if there is no intermediate commit to the database
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 2
)
}
}
@StartableByRPC
class EntityManagerSqlFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerErrorFromSqlFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
try {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
try {
serviceHub.withEntityManager {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
try {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
// These entities are not saved since the transaction is marked for rollback
try {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", entityWithIdTwo.id)
.setParameter("name", entityWithIdTwo.name)
.setParameter("quote", entityWithIdTwo.name)
.executeUpdate()
} catch (e: PersistenceException) {
if (e.cause is ConstraintViolationException) {
throw e
} else {
logger.info(
"""
Caught exception from second sql statement inside the same broken entity manager
This happens if the database has thrown an exception due to rolling back the db transaction
""".trimIndent(), e
)
}
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
try {
serviceHub.withEntityManager {
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", anotherEntityWithIdOne.id)
.setParameter("name", anotherEntityWithIdOne.name)
.setParameter("quote", anotherEntityWithIdOne.name)
.executeUpdate()
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
serviceHub.withEntityManager {
val query = createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
.setParameter("id", entityWithIdTwo.id)
.setParameter("name", entityWithIdTwo.name)
.setParameter("quote", entityWithIdTwo.name)
query.executeUpdate()
}
sleep(1.millis)
}
}
}

View File

@ -0,0 +1,763 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.CollectSignaturesFlow
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.SignTransactionFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DummyCommandData
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.hibernate.exception.ConstraintViolationException
import org.junit.Before
import org.junit.Test
import java.sql.Connection
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import javax.persistence.PersistenceException
import kotlin.test.assertEquals
class FlowEntityManagerTest : AbstractFlowEntityManagerTest() {
@Before
override fun before() {
MyService.includeRawUpdates = false
super.before()
}
@Test(timeout = 300_000)
fun `entities can be saved using entity manager without a flush`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow)
.returnValue.getOrThrow(30.seconds)
assertEquals(0, counter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(3, entities.size)
}
}
@Test(timeout = 300_000)
fun `entities can be saved using entity manager with a flush`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithAFlushFlow)
.returnValue.getOrThrow(30.seconds)
assertEquals(0, counter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(3, entities.size)
}
}
@Test(timeout = 300_000)
fun `entities saved inside an entity manager are only committed when a flow suspends`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
var beforeCommitEntities: List<CustomTableEntity>? = null
EntityManagerSaveEntitiesWithoutAFlushFlow.beforeCommitHook = {
beforeCommitEntities = it
}
var afterCommitEntities: List<CustomTableEntity>? = null
EntityManagerSaveEntitiesWithoutAFlushFlow.afterCommitHook = {
afterCommitEntities = it
}
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow)
.returnValue.getOrThrow(30.seconds)
assertEquals(0, counter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(3, entities.size)
assertEquals(0, beforeCommitEntities!!.size)
assertEquals(3, afterCommitEntities!!.size)
}
}
@Test(timeout = 300_000)
fun `constraint violation without a flush breaks`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorWithoutAFlushFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorWithoutAFlushFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation with a flush breaks`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorWithAFlushFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerErrorWithAFlushFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation with a flush that is caught inside an entity manager block saves none of the data inside of it`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
// 1 entity saved from the first entity manager block that does not get rolled back
// even if there is no intermediate commit to the database
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation with a flush that is caught outside the entity manager block saves none of the data inside of it`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
// 1 entity saved from the first entity manager block that does not get rolled back
// even if there is no intermediate commit to the database
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation within a single entity manager block throws an exception and saves no data`() {
var dischargeCounter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter }
val lock = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() }
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.startFlow(::EntityManagerErrorInsideASingleEntityManagerFlow)
lock.acquire()
// Goes straight to observation due to throwing [EntityExistsException]
assertEquals(0, dischargeCounter)
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
assertEquals(0, entities.size)
}
}
@Test(timeout = 300_000)
fun `constraint violation on a single entity when saving multiple entities throws an exception and does not save any data within the entity manager block`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 0
)
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 3,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation on a single entity when saving multiple entities and catching the error does not save any data within the entity manager block`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
// 1 entity saved from the first entity manager block that does not get rolled back
// even if there is no intermediate commit to the database
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 3
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 3
)
}
}
@Test(timeout = 300_000)
fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 1
)
}
}
@Test(timeout = 300_000)
fun `constraint violation that is caught outside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager,
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 3
)
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager,
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
numberOfDischarges = 0,
numberOfExpectedEntities = 3
)
}
}
@Test(timeout = 300_000)
fun `constraint violation that is caught inside an entity manager should allow a flow to continue processing as normal`() {
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val txId =
alice.rpc.startFlow(::EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(20.seconds)
assertEquals(0, counter)
val txFromVault = alice.rpc.stateMachineRecordedTransactionMappingSnapshot().firstOrNull()?.transactionId
assertEquals(txId, txFromVault)
val entity = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow().single()
assertEquals(entityWithIdOne, entity)
}
}
@Test(timeout = 300_000)
fun `data saved from an entity manager vault update should be visible within an entity manager block inside the same database transaction`() {
MyService.includeRawUpdates = true
MyService.insertionType = MyService.InsertionType.ENTITY_MANAGER
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val entities =
alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds)
assertEquals(3, entities.size)
assertEquals(0, counter)
}
}
@Test(timeout = 300_000)
fun `data saved from a jdbc connection vault update should be visible within an entity manager block inside the same database transaction`() {
MyService.includeRawUpdates = true
MyService.insertionType = MyService.InsertionType.CONNECTION
var counter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val entities =
alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds)
assertEquals(3, entities.size)
assertEquals(0, counter)
}
}
@StartableByRPC
class EntityManagerSaveEntitiesWithoutAFlushFlow : FlowLogic<Unit>() {
companion object {
var beforeCommitHook: ((entities: List<CustomTableEntity>) -> Unit)? = null
var afterCommitHook: ((entities: List<CustomTableEntity>) -> Unit)? = null
}
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
persist(entityWithIdTwo)
persist(entityWithIdThree)
}
beforeCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities())
sleep(1.millis)
afterCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities())
}
}
@StartableByRPC
class EntityManagerSaveEntitiesWithAFlushFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
persist(entityWithIdTwo)
persist(entityWithIdThree)
flush()
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerErrorWithoutAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerErrorWithAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
flush()
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
try {
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
try {
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
flush()
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerErrorInsideASingleEntityManagerFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
persist(anotherEntityWithIdOne)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerSavingMultipleEntitiesWithASingleErrorFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
persist(entityWithIdTwo)
persist(entityWithIdThree)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
persist(entityWithIdTwo)
persist(entityWithIdThree)
try {
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
try {
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(entityWithIdTwo)
persist(entityWithIdThree)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
try {
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
// These entities are not saved since the transaction is marked for rollback
try {
persist(entityWithIdTwo)
persist(entityWithIdThree)
} catch (e: PersistenceException) {
if (e.cause is ConstraintViolationException) {
throw e
} else {
logger.info(
"""
Caught exception from second set of persists inside the same broken entity manager
This happens if the database has thrown an exception due to rolling back the db transaction
""".trimIndent(), e
)
}
}
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
try {
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
}
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
sleep(1.millis)
}
serviceHub.withEntityManager {
persist(entityWithIdTwo)
persist(entityWithIdThree)
}
sleep(1.millis)
}
}
@StartableByRPC
class EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow(private val party: Party) : FlowLogic<SecureHash>() {
@Suspendable
override fun call(): SecureHash {
serviceHub.withEntityManager {
persist(entityWithIdOne)
}
serviceHub.withEntityManager {
persist(anotherEntityWithIdOne)
try {
flush()
} catch (e: PersistenceException) {
logger.info("Caught the exception!")
}
}
return subFlow(CreateATransactionFlow(party))
}
}
@InitiatingFlow
class CreateATransactionFlow(val party: Party) : FlowLogic<SecureHash>() {
@Suspendable
override fun call(): SecureHash {
val session = initiateFlow(party)
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity, party)))
addCommand(DummyCommandData, ourIdentity.owningKey, party.owningKey)
}
val stx = serviceHub.signInitialTransaction(tx)
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
return subFlow(FinalityFlow(ftx, session)).id
}
}
@InitiatedBy(CreateATransactionFlow::class)
class CreateATransactionResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = subFlow(object : SignTransactionFlow(session) {
override fun checkTransaction(stx: SignedTransaction) {
}
})
subFlow(ReceiveFinalityFlow(session, stx.id))
}
}
@StartableByRPC
class EntityManagerWithinTheSameDatabaseTransactionFlow : FlowLogic<List<CustomTableEntity>>() {
@Suspendable
override fun call(): List<CustomTableEntity> {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyCommandData, ourIdentity.owningKey)
}
val stx = serviceHub.signInitialTransaction(tx)
serviceHub.recordTransactions(stx)
return serviceHub.withEntityManager {
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
criteria.select(criteria.from(CustomTableEntity::class.java))
createQuery(criteria).resultList
}
}
}
@CordaService
class MyService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
var includeRawUpdates = false
var insertionType = InsertionType.ENTITY_MANAGER
}
enum class InsertionType { ENTITY_MANAGER, CONNECTION }
val executors: ExecutorService = Executors.newFixedThreadPool(1)
init {
if (includeRawUpdates) {
services.register {
services.vaultService.rawUpdates.subscribe {
if (insertionType == InsertionType.ENTITY_MANAGER) {
services.withEntityManager {
persist(entityWithIdOne)
persist(entityWithIdTwo)
persist(entityWithIdThree)
}
} else {
services.jdbcSession().run {
insert(entityWithIdOne)
insert(entityWithIdTwo)
insert(entityWithIdThree)
}
}
}
}
}
}
private fun Connection.insert(entity: CustomTableEntity) {
prepareStatement("INSERT INTO $TABLE_NAME VALUES (?, ?, ?)").apply {
setInt(1, entity.id)
setString(2, entity.name)
setString(3, entity.quote)
}.executeUpdate()
}
fun getEntities(): List<CustomTableEntity> {
return executors.submit<List<CustomTableEntity>> {
services.database.transaction {
session.run {
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
criteria.select(criteria.from(CustomTableEntity::class.java))
createQuery(criteria).resultList
}
}
}.get()
}
}
}

View File

@ -159,7 +159,9 @@ import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesException
import net.corda.nodeapi.internal.persistence.RestrictedEntityManager
import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch
@ -174,6 +176,7 @@ import java.security.KeyPair
import java.security.KeyStoreException
import java.security.cert.X509Certificate
import java.sql.Connection
import java.sql.Savepoint
import java.time.Clock
import java.time.Duration
import java.time.format.DateTimeParseException
@ -187,6 +190,7 @@ import java.util.concurrent.TimeUnit.MINUTES
import java.util.concurrent.TimeUnit.SECONDS
import java.util.function.Consumer
import javax.persistence.EntityManager
import javax.persistence.PersistenceException
import kotlin.collections.ArrayList
/**
@ -1163,12 +1167,52 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override fun jdbcSession(): Connection = database.createSession()
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
return database.transaction {
return database.transaction(useErrorHandler = false) {
session.flush()
block(restrictedEntityManager)
val manager = entityManager
withSavePoint { savepoint ->
// Restrict what entity manager they can use inside the block
try {
block(RestrictedEntityManager(manager)).also {
if (!manager.transaction.rollbackOnly) {
manager.flush()
} else {
connection.rollback(savepoint)
}
}
} catch (e: PersistenceException) {
if (manager.transaction.rollbackOnly) {
connection.rollback(savepoint)
}
throw e
} finally {
manager.close()
}
}
}
}
private fun <T: Any?> DatabaseTransaction.withSavePoint(block: (savepoint: Savepoint) -> T) : T {
val savepoint = connection.setSavepoint()
return try {
block(savepoint)
} finally {
// Release the save point even if we occur an error
if (savepoint.supportsReleasing()) {
connection.releaseSavepoint(savepoint)
}
}
}
/**
* Not all databases support releasing of savepoints.
* The type of savepoints are referenced by string names since we do not have access to the JDBC drivers
* at compile time.
*/
private fun Savepoint.supportsReleasing(): Boolean {
return this::class.simpleName != "SQLServerSavepoint" && this::class.simpleName != "OracleSavepoint"
}
override fun withEntityManager(block: Consumer<EntityManager>) {
withEntityManager {
block.accept(this)

View File

@ -223,7 +223,10 @@ class ActionExecutorImpl(
@Suspendable
private fun executeRollbackTransaction() {
contextTransactionOrNull?.close()
contextTransactionOrNull?.run {
rollback()
close()
}
}
@Suspendable

View File

@ -5,6 +5,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.RolledBackDatabaseSessionException
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -180,6 +181,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
} catch (t: PersistenceException) {
// This only helps if thrown on commit, otherwise other latches not counted down.
assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome)
} catch (t: RolledBackDatabaseSessionException) {
assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome)
}
a.await(a::phase4)
b.await(b::phase4)

View File

@ -1,7 +1,12 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
@ -50,13 +55,11 @@ class VaultFlowTest {
@After
fun tearDown() {
mockNetwork.stopNodes()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
}
@Test(timeout=300_000)
fun `Unique column constraint failing causes states to not persist to vaults`() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException })
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
val hospitalLatch = CountDownLatch(1)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> hospitalLatch.countDown() }

View File

@ -236,11 +236,11 @@ open class MockServices private constructor(
override fun jdbcSession(): Connection = persistence.createSession()
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
return block(contextTransaction.restrictedEntityManager)
return block(contextTransaction.entityManager)
}
override fun withEntityManager(block: Consumer<EntityManager>) {
return block.accept(contextTransaction.restrictedEntityManager)
return block.accept(contextTransaction.entityManager)
}
}
}