mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
CORDA-3722 withEntityManager can rollback its session (#6187)
* CORDA-3722 withEntityManager can rollback its session ## Summary Improve the handling of database transactions when using `withEntityManager` inside a flow. Extra changes have been included to improve the safety and correctness of Corda around handling database transactions. This focuses on allowing flows to catch errors that occur inside an entity manager and handle them accordingly. Errors can be caught in two places: - Inside `withEntityManager` - Outside `withEntityManager` Further changes have been included to ensure that transactions are rolled back correctly. ## Catching errors inside `withEntityManager` Errors caught inside `withEntityManager` require the flow to manually `flush` the current session (the entity manager's individual session). By manually flushing the session, a `try-catch` block can be placed around the `flush` call, allowing possible exceptions to be caught. Once an error is thrown from a call to `flush`, it is no longer possible to use the same entity manager to trigger any database operations. The only possible option is to rollback the changes from that session. The flow can continue executing updates within the same session but they will never be committed. What happens in this situation should be handled by the flow. Explicitly restricting the scenario requires a lot of effort and code. Instead, we should rely on the developer to control complex workflows. To continue updating the database after an error like this occurs, a new `withEntityManager` block should be used (after catching the previous error). ## Catching errors outside `withEntityManager` Exceptions can be caught around `withEntityManager` blocks. This allows errors to be handled in the same way as stated above, except the need to manually `flush` the session is removed. `withEntityManager` will automatically `flush` a session if it has not been marked for rollback due to an earlier error. A `try-catch` can then be placed around the whole of the `withEntityManager` block, allowing the error to be caught while not committing any changes to the underlying database transaction. ## Savepoints / Transactionality To make `withEntityManager` blocks work like mini database transactions, save points have been utilised. A new savepoint is created when opening a `withEntityManager` block (along with a new session). It is then used as a reference point to rollback to if the session errors and needs to roll back. The savepoint is then released (independently from completing successfully or failing). Using save points means, that either all the statements inside the entity manager are executed, or none of them are. ## Some implementation details - A new session is created every time an entity manager is requested, but this does not replace the flow's main underlying database session. - `CordaPersistence.transaction` can now determine whether it needs to execute its extra error handling code. This is needed to allow errors escape `withEntityManager` blocks while allowing some of our exception handling around subscribers (in `NodeVaultService`) to continue to work.
This commit is contained in:
parent
ec96a844bd
commit
9a2ae8ae19
@ -381,7 +381,7 @@ interface ServiceHub : ServicesForResolution {
|
||||
* and thus queryable data will include everything committed as of the last checkpoint.
|
||||
*
|
||||
* We want to make sure users have a restricted access to administrative functions, this function will return a [RestrictedConnection] instance.
|
||||
* The blocked methods are the following:
|
||||
* The following methods are blocked:
|
||||
* - abort(executor: Executor?)
|
||||
* - clearWarnings()
|
||||
* - close()
|
||||
@ -415,15 +415,22 @@ interface ServiceHub : ServicesForResolution {
|
||||
* @param block a lambda function with access to an [EntityManager].
|
||||
*
|
||||
* We want to make sure users have a restricted access to administrative functions.
|
||||
* The blocked methods are the following:
|
||||
* The following methods are blocked:
|
||||
* - close()
|
||||
* - clear()
|
||||
* - unwrap(cls: Class<T>?)
|
||||
* - getDelegate(): Any
|
||||
* - getMetamodel()
|
||||
* - getTransaction()
|
||||
* - joinTransaction()
|
||||
* - lock(entity: Any?, lockMode: LockModeType?)
|
||||
* - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap<String, Any>?)
|
||||
* - setProperty(propertyName: String?, value: Any?)
|
||||
*
|
||||
* getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying
|
||||
* database transaction.
|
||||
* The following methods are blocked:
|
||||
* - begin()
|
||||
* - commit()
|
||||
* - rollback()
|
||||
*/
|
||||
fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T
|
||||
|
||||
@ -435,6 +442,24 @@ interface ServiceHub : ServicesForResolution {
|
||||
* NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda.
|
||||
*
|
||||
* @param block a lambda function with access to an [EntityManager].
|
||||
*
|
||||
* We want to make sure users have a restricted access to administrative functions.
|
||||
* The following methods are blocked:
|
||||
* - close()
|
||||
* - unwrap(cls: Class<T>?)
|
||||
* - getDelegate(): Any
|
||||
* - getMetamodel()
|
||||
* - joinTransaction()
|
||||
* - lock(entity: Any?, lockMode: LockModeType?)
|
||||
* - lock(entity: Any?, lockMode: LockModeType?, properties: MutableMap<String, Any>?)
|
||||
* - setProperty(propertyName: String?, value: Any?)
|
||||
*
|
||||
* getTransaction returns a [RestrictedEntityTransaction] to prevent unsafe manipulation of a flow's underlying
|
||||
* database transaction.
|
||||
* The following methods are blocked:
|
||||
* - begin()
|
||||
* - commit()
|
||||
* - rollback()
|
||||
*/
|
||||
fun withEntityManager(block: Consumer<EntityManager>)
|
||||
|
||||
|
@ -211,14 +211,15 @@ class CordaPersistence(
|
||||
* @param isolationLevel isolation level for the transaction.
|
||||
* @param statement to be executed in the scope of this transaction.
|
||||
*/
|
||||
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T =
|
||||
transaction(isolationLevel, 2, false, statement)
|
||||
fun <T> transaction(isolationLevel: TransactionIsolationLevel, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T =
|
||||
transaction(isolationLevel, 2, false, useErrorHandler, statement)
|
||||
|
||||
/**
|
||||
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
|
||||
* @param statement to be executed in the scope of this transaction.
|
||||
*/
|
||||
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
|
||||
@JvmOverloads
|
||||
fun <T> transaction(useErrorHandler: Boolean = true, statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, useErrorHandler, statement)
|
||||
|
||||
/**
|
||||
* Executes given statement in the scope of transaction, with the given isolation level.
|
||||
@ -228,7 +229,7 @@ class CordaPersistence(
|
||||
* @param statement to be executed in the scope of this transaction.
|
||||
*/
|
||||
fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int,
|
||||
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
|
||||
recoverAnyNestedSQLException: Boolean, useErrorHandler: Boolean, statement: DatabaseTransaction.() -> T): T {
|
||||
_contextDatabase.set(this)
|
||||
val outer = contextTransactionOrNull
|
||||
return if (outer != null) {
|
||||
@ -237,26 +238,34 @@ class CordaPersistence(
|
||||
// previously been created by the flow state machine in ActionExecutorImpl#executeCreateTransaction
|
||||
// b. exceptions coming out from top level transactions are already being handled in CordaPersistence#inTopLevelTransaction
|
||||
// i.e. roll back and close the transaction
|
||||
try {
|
||||
if(useErrorHandler) {
|
||||
outer.withErrorHandler(statement)
|
||||
} else {
|
||||
outer.statement()
|
||||
} catch (e: Exception) {
|
||||
if (e is SQLException || e is PersistenceException || e is HospitalizeFlowException) {
|
||||
outer.errorHandler(e)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
} else {
|
||||
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T> DatabaseTransaction.withErrorHandler(statement: DatabaseTransaction.() -> T): T {
|
||||
return try {
|
||||
statement()
|
||||
} catch (e: Exception) {
|
||||
if ((e is SQLException || e is PersistenceException || e is HospitalizeFlowException)) {
|
||||
errorHandler(e)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
|
||||
* @param statement to be executed in the scope of this transaction.
|
||||
* @param recoverableFailureTolerance number of transaction commit retries for SQL while SQL exception is encountered.
|
||||
*/
|
||||
fun <T> transaction(recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
|
||||
return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, statement)
|
||||
return transaction(defaultIsolationLevel, recoverableFailureTolerance, false, false, statement)
|
||||
}
|
||||
|
||||
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int,
|
||||
|
@ -39,9 +39,13 @@ class DatabaseTransaction(
|
||||
}
|
||||
|
||||
// Returns a delegate which overrides certain operations that we do not want CorDapp developers to call.
|
||||
val restrictedEntityManager: RestrictedEntityManager by lazy {
|
||||
val entityManager = session as EntityManager
|
||||
RestrictedEntityManager(entityManager)
|
||||
|
||||
val entityManager: EntityManager get() {
|
||||
// Always retrieve new session ([Session] implements [EntityManager])
|
||||
// Note, this does not replace the top level hibernate session
|
||||
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||
session.beginTransaction()
|
||||
return session
|
||||
}
|
||||
|
||||
val session: Session by sessionDelegate
|
||||
@ -73,6 +77,10 @@ class DatabaseTransaction(
|
||||
throw DatabaseTransactionException(it)
|
||||
}
|
||||
if (sessionDelegate.isInitialized()) {
|
||||
// The [sessionDelegate] must be initialised otherwise calling [entityManager] will cause an exception
|
||||
if(session.transaction.rollbackOnly) {
|
||||
throw RolledBackDatabaseSessionException()
|
||||
}
|
||||
hibernateTransaction.commit()
|
||||
}
|
||||
connection.commit()
|
||||
@ -124,4 +132,6 @@ class DatabaseTransaction(
|
||||
/**
|
||||
* Wrapper exception, for any exception registered as [DatabaseTransaction.firstExceptionInDatabaseTransaction].
|
||||
*/
|
||||
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
|
||||
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
|
||||
|
||||
class RolledBackDatabaseSessionException : CordaRuntimeException("Attempted to commit database transaction marked for rollback")
|
@ -10,11 +10,19 @@ import javax.persistence.metamodel.Metamodel
|
||||
*/
|
||||
class RestrictedEntityManager(private val delegate: EntityManager) : EntityManager by delegate {
|
||||
|
||||
override fun getTransaction(): EntityTransaction {
|
||||
return RestrictedEntityTransaction(delegate.transaction)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun clear() {
|
||||
override fun <T : Any?> unwrap(cls: Class<T>?): T {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun getDelegate(): Any {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
@ -22,10 +30,6 @@ class RestrictedEntityManager(private val delegate: EntityManager) : EntityManag
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun getTransaction(): EntityTransaction? {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun joinTransaction() {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
@ -41,5 +45,19 @@ class RestrictedEntityManager(private val delegate: EntityManager) : EntityManag
|
||||
override fun setProperty(propertyName: String?, value: Any?) {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
}
|
||||
|
||||
class RestrictedEntityTransaction(private val delegate: EntityTransaction) : EntityTransaction by delegate {
|
||||
|
||||
override fun rollback() {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun commit() {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
|
||||
override fun begin() {
|
||||
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
|
||||
}
|
||||
}
|
@ -1,12 +1,17 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import org.junit.Test
|
||||
import javax.persistence.EntityManager
|
||||
import javax.persistence.EntityTransaction
|
||||
import javax.persistence.LockModeType
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class RestrtictedEntityManagerTest {
|
||||
class RestrictedEntityManagerTest {
|
||||
private val entitymanager = mock<EntityManager>()
|
||||
private val transaction = mock<EntityTransaction>()
|
||||
private val restrictedEntityManager = RestrictedEntityManager(entitymanager)
|
||||
|
||||
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
|
||||
@ -14,7 +19,7 @@ class RestrtictedEntityManagerTest {
|
||||
restrictedEntityManager.close()
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun testClear() {
|
||||
restrictedEntityManager.clear()
|
||||
}
|
||||
@ -24,9 +29,10 @@ class RestrtictedEntityManagerTest {
|
||||
restrictedEntityManager.getMetamodel()
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun testGetTransaction() {
|
||||
restrictedEntityManager.getTransaction()
|
||||
whenever(entitymanager.transaction).doReturn(transaction)
|
||||
assertTrue(restrictedEntityManager.transaction is RestrictedEntityTransaction)
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException::class, timeout=300_000)
|
@ -0,0 +1,134 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import org.junit.Before
|
||||
import java.util.concurrent.Semaphore
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.Table
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
abstract class AbstractFlowEntityManagerTest {
|
||||
|
||||
protected companion object {
|
||||
|
||||
const val TABLE_NAME = "entity_manager_custom_table"
|
||||
|
||||
val entityWithIdOne = CustomTableEntity(1, "Dan", "This won't work")
|
||||
val anotherEntityWithIdOne = CustomTableEntity(1, "Rick", "I'm pretty sure this will work")
|
||||
val entityWithIdTwo = CustomTableEntity(2, "Ivan", "This will break existing CorDapps")
|
||||
val entityWithIdThree = CustomTableEntity(3, "Some other guy", "What am I doing here?")
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
enum class CommitStatus { INTERMEDIATE_COMMIT, NO_INTERMEDIATE_COMMIT }
|
||||
|
||||
@Before
|
||||
open fun before() {
|
||||
StaffedFlowHospital.onFlowDischarged.clear()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
}
|
||||
|
||||
protected inline fun <reified R : FlowLogic<Any>> CordaRPCOps.expectFlowFailureAndAssertCreatedEntities(
|
||||
crossinline flow: (CommitStatus) -> R,
|
||||
commitStatus: CommitStatus,
|
||||
numberOfDischarges: Int,
|
||||
numberOfExpectedEntities: Int
|
||||
): Int {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
val lock = Semaphore(0)
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() }
|
||||
startFlow(flow, commitStatus)
|
||||
lock.acquire()
|
||||
assertEquals(
|
||||
numberOfDischarges,
|
||||
counter,
|
||||
"[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)"
|
||||
)
|
||||
val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size
|
||||
assertEquals(
|
||||
numberOfExpectedEntities,
|
||||
numberOfEntities,
|
||||
"[$commitStatus] expected $numberOfExpectedEntities to be saved"
|
||||
)
|
||||
startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds)
|
||||
return numberOfEntities
|
||||
}
|
||||
|
||||
protected inline fun <reified R : FlowLogic<Any>> CordaRPCOps.expectFlowSuccessAndAssertCreatedEntities(
|
||||
crossinline flow: (CommitStatus) -> R,
|
||||
commitStatus: CommitStatus,
|
||||
numberOfDischarges: Int,
|
||||
numberOfExpectedEntities: Int
|
||||
): Int {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
startFlow(flow, commitStatus).returnValue.getOrThrow(30.seconds)
|
||||
assertEquals(
|
||||
numberOfDischarges,
|
||||
counter,
|
||||
"[$commitStatus] expected the flow to be discharged from hospital $numberOfDischarges time(s)"
|
||||
)
|
||||
val numberOfEntities = startFlow(::GetCustomEntities).returnValue.getOrThrow().size
|
||||
assertEquals(
|
||||
numberOfExpectedEntities,
|
||||
numberOfEntities,
|
||||
"[$commitStatus] expected $numberOfExpectedEntities to be saved"
|
||||
)
|
||||
startFlow(::DeleteCustomEntities).returnValue.getOrThrow(30.seconds)
|
||||
return numberOfEntities
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class GetCustomEntities : FlowLogic<List<CustomTableEntity>>() {
|
||||
@Suspendable
|
||||
override fun call(): List<CustomTableEntity> {
|
||||
return serviceHub.withEntityManager {
|
||||
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
|
||||
criteria.select(criteria.from(CustomTableEntity::class.java))
|
||||
createQuery(criteria).resultList
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class DeleteCustomEntities : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
val delete = criteriaBuilder.createCriteriaDelete(CustomTableEntity::class.java)
|
||||
delete.from(CustomTableEntity::class.java)
|
||||
createQuery(delete).executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = TABLE_NAME)
|
||||
@CordaSerializable
|
||||
data class CustomTableEntity constructor(
|
||||
@Id
|
||||
@Column(name = "id", nullable = false)
|
||||
var id: Int,
|
||||
@Column(name = "name", nullable = false)
|
||||
var name: String,
|
||||
@Column(name = "quote", nullable = false)
|
||||
var quote: String
|
||||
)
|
||||
|
||||
object CustomSchema
|
||||
|
||||
object CustomMappedSchema : MappedSchema(CustomSchema::class.java, 1, listOf(CustomTableEntity::class.java))
|
||||
}
|
@ -0,0 +1,374 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Test
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowEntityManagerNestedTest : AbstractFlowEntityManagerTest() {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager inside an entity manager saves all data`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
alice.rpc.startFlow(::EntityManagerInsideAnEntityManagerFlow).returnValue.getOrThrow(20.seconds)
|
||||
assertEquals(0, counter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(2, entities.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager inside an entity manager that throws an error does not save any data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager that saves an entity with an entity manager inside it that throws an error after saving the entity does not save any data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager after saving the entity saves the data from the external entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it inside the entity manager after saving the entity saves the data from the external entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager that saves an entity with an entity manager inside it that throws an error and catching it around the entity manager before saving the entity saves the data from the external entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error outside itself after saving the entity does not save the data from the internal entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entity manager with an entity manager inside it saves an entity, outer throws and catches the error inside itself after saving the entity does not save the data from the internal entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerInsideAnEntityManagerFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerInsideAnEntityManagerThatThrowsAnExceptionFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAfterSavingFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerAfterSavingFlow(
|
||||
private val commitStatus: CommitStatus
|
||||
) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesInsideTheEntityManagerAfterSavingFlow(
|
||||
private val commitStatus: CommitStatus
|
||||
) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
serviceHub.withEntityManager {
|
||||
try {
|
||||
persist(anotherEntityWithIdOne)
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityWithAnEntityManagerInsideItThatThrowsAnExceptionAndCatchesAroundTheEntityManagerBeforeSavingFlow(
|
||||
private val commitStatus: CommitStatus
|
||||
) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
persist(entityWithIdTwo)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesAroundOuterEntityManagerAfterSavingFlow(
|
||||
private val commitStatus: CommitStatus
|
||||
) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
}
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerThatSavesAnEntityUsingInternalEntityManagerAndThrowsFromOuterAndCatchesInsideOuterEntityManagerAfterSavingFlow(
|
||||
private val commitStatus: CommitStatus
|
||||
) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
}
|
||||
try {
|
||||
persist(anotherEntityWithIdOne)
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,309 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import org.hibernate.exception.ConstraintViolationException
|
||||
import org.junit.Test
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowEntityManagerStatementTest : AbstractFlowEntityManagerTest() {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `data can be saved by a sql statement using entity manager`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
alice.rpc.startFlow(::EntityManagerSqlFlow).returnValue.getOrThrow(20.seconds)
|
||||
assertEquals(0, counter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(1, entities.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation caused by a sql statement should save no data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorFromSqlFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorFromSqlFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation caused by a sql statement that is caught inside an entity manager block saves none of the data inside of it`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
// 1 entity saved from the first entity manager block that does not get rolled back
|
||||
// even if there is no intermediate commit to the database
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation caused by a sql statement that is caught outside an entity manager block saves none of the data inside of it`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
// 1 entity saved from the first entity manager block that does not get rolled back
|
||||
// even if there is no intermediate commit to the database
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation caused by a sql statement that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 2
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerSqlFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerErrorFromSqlFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorFromSqlInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
try {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorFromSqlOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInTheSameEntityManagerFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
try {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
// These entities are not saved since the transaction is marked for rollback
|
||||
try {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", entityWithIdTwo.id)
|
||||
.setParameter("name", entityWithIdTwo.name)
|
||||
.setParameter("quote", entityWithIdTwo.name)
|
||||
.executeUpdate()
|
||||
} catch (e: PersistenceException) {
|
||||
if (e.cause is ConstraintViolationException) {
|
||||
throw e
|
||||
} else {
|
||||
logger.info(
|
||||
"""
|
||||
Caught exception from second sql statement inside the same broken entity manager
|
||||
This happens if the database has thrown an exception due to rolling back the db transaction
|
||||
""".trimIndent(), e
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorFromSqlAndSaveMoreEntitiesInNewEntityManagerFlow(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", anotherEntityWithIdOne.id)
|
||||
.setParameter("name", anotherEntityWithIdOne.name)
|
||||
.setParameter("quote", anotherEntityWithIdOne.name)
|
||||
.executeUpdate()
|
||||
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
val query = createNativeQuery("INSERT INTO $TABLE_NAME VALUES (:id, :name, :quote)")
|
||||
.setParameter("id", entityWithIdTwo.id)
|
||||
.setParameter("name", entityWithIdTwo.name)
|
||||
.setParameter("quote", entityWithIdTwo.name)
|
||||
query.executeUpdate()
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,763 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.CollectSignaturesFlow
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.SignTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DummyCommandData
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import org.hibernate.exception.ConstraintViolationException
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.Semaphore
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowEntityManagerTest : AbstractFlowEntityManagerTest() {
|
||||
|
||||
@Before
|
||||
override fun before() {
|
||||
MyService.includeRawUpdates = false
|
||||
super.before()
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entities can be saved using entity manager without a flush`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow)
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
assertEquals(0, counter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(3, entities.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entities can be saved using entity manager with a flush`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithAFlushFlow)
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
assertEquals(0, counter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(3, entities.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `entities saved inside an entity manager are only committed when a flow suspends`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
var beforeCommitEntities: List<CustomTableEntity>? = null
|
||||
EntityManagerSaveEntitiesWithoutAFlushFlow.beforeCommitHook = {
|
||||
beforeCommitEntities = it
|
||||
}
|
||||
var afterCommitEntities: List<CustomTableEntity>? = null
|
||||
EntityManagerSaveEntitiesWithoutAFlushFlow.afterCommitHook = {
|
||||
afterCommitEntities = it
|
||||
}
|
||||
|
||||
alice.rpc.startFlow(::EntityManagerSaveEntitiesWithoutAFlushFlow)
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
assertEquals(0, counter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(3, entities.size)
|
||||
assertEquals(0, beforeCommitEntities!!.size)
|
||||
assertEquals(3, afterCommitEntities!!.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation without a flush breaks`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorWithoutAFlushFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorWithoutAFlushFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation with a flush breaks`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorWithAFlushFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerErrorWithAFlushFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation with a flush that is caught inside an entity manager block saves none of the data inside of it`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
// 1 entity saved from the first entity manager block that does not get rolled back
|
||||
// even if there is no intermediate commit to the database
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation with a flush that is caught outside the entity manager block saves none of the data inside of it`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
// 1 entity saved from the first entity manager block that does not get rolled back
|
||||
// even if there is no intermediate commit to the database
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation within a single entity manager block throws an exception and saves no data`() {
|
||||
var dischargeCounter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter }
|
||||
val lock = Semaphore(0)
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() }
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::EntityManagerErrorInsideASingleEntityManagerFlow)
|
||||
lock.acquire()
|
||||
// Goes straight to observation due to throwing [EntityExistsException]
|
||||
assertEquals(0, dischargeCounter)
|
||||
val entities = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow()
|
||||
assertEquals(0, entities.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation on a single entity when saving multiple entities throws an exception and does not save any data within the entity manager block`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 0
|
||||
)
|
||||
alice.rpc.expectFlowFailureAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerSavingMultipleEntitiesWithASingleErrorFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 3,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation on a single entity when saving multiple entities and catching the error does not save any data within the entity manager block`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
// 1 entity saved from the first entity manager block that does not get rolled back
|
||||
// even if there is no intermediate commit to the database
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 3
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 3
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation that is caught inside an entity manager and more data is saved afterwards inside the same entity manager should not save the extra data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 1
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation that is caught outside an entity manager and more data is saved afterwards inside a new entity manager should save the extra data`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager,
|
||||
commitStatus = CommitStatus.NO_INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 3
|
||||
)
|
||||
alice.rpc.expectFlowSuccessAndAssertCreatedEntities(
|
||||
flow = ::EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager,
|
||||
commitStatus = CommitStatus.INTERMEDIATE_COMMIT,
|
||||
numberOfDischarges = 0,
|
||||
numberOfExpectedEntities = 3
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `constraint violation that is caught inside an entity manager should allow a flow to continue processing as normal`() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val txId =
|
||||
alice.rpc.startFlow(::EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
assertEquals(0, counter)
|
||||
val txFromVault = alice.rpc.stateMachineRecordedTransactionMappingSnapshot().firstOrNull()?.transactionId
|
||||
assertEquals(txId, txFromVault)
|
||||
val entity = alice.rpc.startFlow(::GetCustomEntities).returnValue.getOrThrow().single()
|
||||
assertEquals(entityWithIdOne, entity)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `data saved from an entity manager vault update should be visible within an entity manager block inside the same database transaction`() {
|
||||
MyService.includeRawUpdates = true
|
||||
MyService.insertionType = MyService.InsertionType.ENTITY_MANAGER
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
val entities =
|
||||
alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds)
|
||||
assertEquals(3, entities.size)
|
||||
assertEquals(0, counter)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `data saved from a jdbc connection vault update should be visible within an entity manager block inside the same database transaction`() {
|
||||
MyService.includeRawUpdates = true
|
||||
MyService.insertionType = MyService.InsertionType.CONNECTION
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
|
||||
driver(DriverParameters(startNodesInProcess = true)) {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
|
||||
val entities =
|
||||
alice.rpc.startFlow(::EntityManagerWithinTheSameDatabaseTransactionFlow).returnValue.getOrThrow(20.seconds)
|
||||
assertEquals(3, entities.size)
|
||||
assertEquals(0, counter)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerSaveEntitiesWithoutAFlushFlow : FlowLogic<Unit>() {
|
||||
|
||||
companion object {
|
||||
var beforeCommitHook: ((entities: List<CustomTableEntity>) -> Unit)? = null
|
||||
var afterCommitHook: ((entities: List<CustomTableEntity>) -> Unit)? = null
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
}
|
||||
beforeCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities())
|
||||
sleep(1.millis)
|
||||
afterCommitHook?.invoke(serviceHub.cordaService(MyService::class.java).getEntities())
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerSaveEntitiesWithAFlushFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
flush()
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerErrorWithoutAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerErrorWithAFlushFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
flush()
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerWithAFlushCatchErrorInsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
try {
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerWithAFlushCatchErrorOutsideTheEntityManagerFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
flush()
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerErrorInsideASingleEntityManagerFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerSavingMultipleEntitiesWithASingleErrorFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerSavingMultipleEntitiesWithASingleCaughtErrorFlow(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
try {
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
try {
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorAndSaveMoreEntitiesInTheSameEntityManager(private val commitStatus: CommitStatus) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
try {
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
// These entities are not saved since the transaction is marked for rollback
|
||||
try {
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
} catch (e: PersistenceException) {
|
||||
if (e.cause is ConstraintViolationException) {
|
||||
throw e
|
||||
} else {
|
||||
logger.info(
|
||||
"""
|
||||
Caught exception from second set of persists inside the same broken entity manager
|
||||
This happens if the database has thrown an exception due to rolling back the db transaction
|
||||
""".trimIndent(), e
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerCatchErrorOutsideTheEntityManagerAndSaveMoreEntitiesInANewEntityManager(private val commitStatus: CommitStatus) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
try {
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
if (commitStatus == CommitStatus.INTERMEDIATE_COMMIT) {
|
||||
sleep(1.millis)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
}
|
||||
sleep(1.millis)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow(private val party: Party) : FlowLogic<SecureHash>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SecureHash {
|
||||
serviceHub.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
}
|
||||
serviceHub.withEntityManager {
|
||||
persist(anotherEntityWithIdOne)
|
||||
try {
|
||||
flush()
|
||||
} catch (e: PersistenceException) {
|
||||
logger.info("Caught the exception!")
|
||||
}
|
||||
}
|
||||
return subFlow(CreateATransactionFlow(party))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
class CreateATransactionFlow(val party: Party) : FlowLogic<SecureHash>() {
|
||||
@Suspendable
|
||||
override fun call(): SecureHash {
|
||||
val session = initiateFlow(party)
|
||||
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
|
||||
addOutputState(DummyState(participants = listOf(ourIdentity, party)))
|
||||
addCommand(DummyCommandData, ourIdentity.owningKey, party.owningKey)
|
||||
}
|
||||
val stx = serviceHub.signInitialTransaction(tx)
|
||||
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
|
||||
return subFlow(FinalityFlow(ftx, session)).id
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(CreateATransactionFlow::class)
|
||||
class CreateATransactionResponder(val session: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val stx = subFlow(object : SignTransactionFlow(session) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
}
|
||||
})
|
||||
subFlow(ReceiveFinalityFlow(session, stx.id))
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EntityManagerWithinTheSameDatabaseTransactionFlow : FlowLogic<List<CustomTableEntity>>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): List<CustomTableEntity> {
|
||||
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
|
||||
addOutputState(DummyState(participants = listOf(ourIdentity)))
|
||||
addCommand(DummyCommandData, ourIdentity.owningKey)
|
||||
}
|
||||
val stx = serviceHub.signInitialTransaction(tx)
|
||||
serviceHub.recordTransactions(stx)
|
||||
return serviceHub.withEntityManager {
|
||||
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
|
||||
criteria.select(criteria.from(CustomTableEntity::class.java))
|
||||
createQuery(criteria).resultList
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@CordaService
|
||||
class MyService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
|
||||
companion object {
|
||||
var includeRawUpdates = false
|
||||
var insertionType = InsertionType.ENTITY_MANAGER
|
||||
}
|
||||
|
||||
enum class InsertionType { ENTITY_MANAGER, CONNECTION }
|
||||
|
||||
val executors: ExecutorService = Executors.newFixedThreadPool(1)
|
||||
|
||||
init {
|
||||
if (includeRawUpdates) {
|
||||
services.register {
|
||||
services.vaultService.rawUpdates.subscribe {
|
||||
if (insertionType == InsertionType.ENTITY_MANAGER) {
|
||||
services.withEntityManager {
|
||||
persist(entityWithIdOne)
|
||||
persist(entityWithIdTwo)
|
||||
persist(entityWithIdThree)
|
||||
}
|
||||
} else {
|
||||
services.jdbcSession().run {
|
||||
insert(entityWithIdOne)
|
||||
insert(entityWithIdTwo)
|
||||
insert(entityWithIdThree)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun Connection.insert(entity: CustomTableEntity) {
|
||||
prepareStatement("INSERT INTO $TABLE_NAME VALUES (?, ?, ?)").apply {
|
||||
setInt(1, entity.id)
|
||||
setString(2, entity.name)
|
||||
setString(3, entity.quote)
|
||||
}.executeUpdate()
|
||||
}
|
||||
|
||||
fun getEntities(): List<CustomTableEntity> {
|
||||
return executors.submit<List<CustomTableEntity>> {
|
||||
services.database.transaction {
|
||||
session.run {
|
||||
val criteria = criteriaBuilder.createQuery(CustomTableEntity::class.java)
|
||||
criteria.select(criteria.from(CustomTableEntity::class.java))
|
||||
createQuery(criteria).resultList
|
||||
}
|
||||
}
|
||||
}.get()
|
||||
}
|
||||
}
|
||||
}
|
@ -160,8 +160,10 @@ import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
|
||||
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesException
|
||||
import net.corda.nodeapi.internal.persistence.RestrictedConnection
|
||||
import net.corda.nodeapi.internal.persistence.RestrictedEntityManager
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.tools.shell.InteractiveShell
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
@ -176,6 +178,7 @@ import java.security.KeyPair
|
||||
import java.security.KeyStoreException
|
||||
import java.security.cert.X509Certificate
|
||||
import java.sql.Connection
|
||||
import java.sql.Savepoint
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.format.DateTimeParseException
|
||||
@ -189,6 +192,7 @@ import java.util.concurrent.TimeUnit.MINUTES
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import java.util.function.Consumer
|
||||
import javax.persistence.EntityManager
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.collections.ArrayList
|
||||
|
||||
/**
|
||||
@ -1192,12 +1196,52 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
override fun jdbcSession(): Connection = RestrictedConnection(database.createSession())
|
||||
|
||||
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
|
||||
return database.transaction {
|
||||
return database.transaction(useErrorHandler = false) {
|
||||
session.flush()
|
||||
block(restrictedEntityManager)
|
||||
val manager = entityManager
|
||||
withSavePoint { savepoint ->
|
||||
// Restrict what entity manager they can use inside the block
|
||||
try {
|
||||
block(RestrictedEntityManager(manager)).also {
|
||||
if (!manager.transaction.rollbackOnly) {
|
||||
manager.flush()
|
||||
} else {
|
||||
connection.rollback(savepoint)
|
||||
}
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
if (manager.transaction.rollbackOnly) {
|
||||
connection.rollback(savepoint)
|
||||
}
|
||||
throw e
|
||||
} finally {
|
||||
manager.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T: Any?> DatabaseTransaction.withSavePoint(block: (savepoint: Savepoint) -> T) : T {
|
||||
val savepoint = connection.setSavepoint()
|
||||
return try {
|
||||
block(savepoint)
|
||||
} finally {
|
||||
// Release the save point even if we occur an error
|
||||
if (savepoint.supportsReleasing()) {
|
||||
connection.releaseSavepoint(savepoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Not all databases support releasing of savepoints.
|
||||
* The type of savepoints are referenced by string names since we do not have access to the JDBC drivers
|
||||
* at compile time.
|
||||
*/
|
||||
private fun Savepoint.supportsReleasing(): Boolean {
|
||||
return this::class.simpleName != "SQLServerSavepoint" && this::class.simpleName != "OracleSavepoint"
|
||||
}
|
||||
|
||||
override fun withEntityManager(block: Consumer<EntityManager>) {
|
||||
withEntityManager {
|
||||
block.accept(this)
|
||||
|
@ -229,7 +229,10 @@ class ActionExecutorImpl(
|
||||
|
||||
@Suspendable
|
||||
private fun executeRollbackTransaction() {
|
||||
contextTransactionOrNull?.close()
|
||||
contextTransactionOrNull?.run {
|
||||
rollback()
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
@ -43,7 +43,10 @@ class TransitionExecutorImpl(
|
||||
try {
|
||||
actionExecutor.executeAction(fiber, action)
|
||||
} catch (exception: Exception) {
|
||||
contextTransactionOrNull?.close()
|
||||
contextTransactionOrNull?.run {
|
||||
rollback()
|
||||
close()
|
||||
}
|
||||
if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
|
||||
// If we errored while transitioning to an error state then we cannot record the additional
|
||||
// error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again.
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.RolledBackDatabaseSessionException
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
@ -180,6 +181,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
|
||||
} catch (t: PersistenceException) {
|
||||
// This only helps if thrown on commit, otherwise other latches not counted down.
|
||||
assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome)
|
||||
} catch (t: RolledBackDatabaseSessionException) {
|
||||
assertEquals(t.message, Outcome.SuccessButErrorOnCommit, a.outcome)
|
||||
}
|
||||
a.await(a::phase4)
|
||||
b.await(b::phase4)
|
||||
|
@ -1,7 +1,12 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.queryBy
|
||||
@ -50,13 +55,11 @@ class VaultFlowTest {
|
||||
@After
|
||||
fun tearDown() {
|
||||
mockNetwork.stopNodes()
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `Unique column constraint failing causes states to not persist to vaults`() {
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException })
|
||||
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
|
||||
val hospitalLatch = CountDownLatch(1)
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> hospitalLatch.countDown() }
|
||||
|
@ -236,11 +236,11 @@ open class MockServices private constructor(
|
||||
override fun jdbcSession(): Connection = persistence.createSession()
|
||||
|
||||
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
|
||||
return block(contextTransaction.restrictedEntityManager)
|
||||
return block(contextTransaction.entityManager)
|
||||
}
|
||||
|
||||
override fun withEntityManager(block: Consumer<EntityManager>) {
|
||||
return block.accept(contextTransaction.restrictedEntityManager)
|
||||
return block.accept(contextTransaction.entityManager)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user