mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
CORDA-3329 Exceptions thrown in raw vault observers can cause critical issues (#5816)
Observers registered on NodeVaultService#rawUpdates, if they throw an exception when called from serviceHub#recordTransactions and if this exception is not handled by the flow hospital, then this leads to the transaction not being recorded in the local vault. This could get the ledger in an out of sync state. In the specific case this happens within FinalityFlow#notariseAndRecord this leads to the transaction being notarized but not recorded in the local vault nor broadcasted in any counter party. The -failed to be recorded locally- transaction and its output states are not visible to any vault, and its input states not able to consumed by a new transaction, since they are recorded as consumed within the Notary. In this specific case we need not loose, by any means, the current transaction. We will handle all cases by catching all exceptions thrown from serviceHub#recordTransactions, wrapping them with a HospitalizeFlowException and throwing it instead. The flow will get to the hospital for observation to be retried from previous checkpoint on next node restart.
This commit is contained in:
committed by
Rick Parker
parent
a4d63d1329
commit
7f62046c2f
@ -173,6 +173,7 @@
|
|||||||
<ID>ComplexMethod:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected fun <K, V> configuredForNamed(caffeine: Caffeine<K, V>, name: String): Caffeine<K, V></ID>
|
<ID>ComplexMethod:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected fun <K, V> configuredForNamed(caffeine: Caffeine<K, V>, name: String): Caffeine<K, V></ID>
|
||||||
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$@Throws(VaultQueryException::class) private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T></ID>
|
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$@Throws(VaultQueryException::class) private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T></ID>
|
||||||
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>></ID>
|
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>></ID>
|
||||||
|
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun processAndNotify(updates: List<Vault.Update<ContractState>>)</ID>
|
||||||
<ID>ComplexMethod:ObjectDiffer.kt$ObjectDiffer$fun diff(a: Any?, b: Any?): DiffTree?</ID>
|
<ID>ComplexMethod:ObjectDiffer.kt$ObjectDiffer$fun diff(a: Any?, b: Any?): DiffTree?</ID>
|
||||||
<ID>ComplexMethod:Obligation.kt$Obligation$override fun verify(tx: LedgerTransaction)</ID>
|
<ID>ComplexMethod:Obligation.kt$Obligation$override fun verify(tx: LedgerTransaction)</ID>
|
||||||
<ID>ComplexMethod:QuasarInstrumentationHook.kt$QuasarInstrumentationHookAgent.Companion$@JvmStatic fun premain(argumentsString: String?, instrumentation: Instrumentation)</ID>
|
<ID>ComplexMethod:QuasarInstrumentationHook.kt$QuasarInstrumentationHookAgent.Companion$@JvmStatic fun premain(argumentsString: String?, instrumentation: Instrumentation)</ID>
|
||||||
@ -196,6 +197,7 @@
|
|||||||
<ID>ComplexMethod:TlsDiffAlgorithmsTest.kt$TlsDiffAlgorithmsTest$@Test fun testClientServerTlsExchange()</ID>
|
<ID>ComplexMethod:TlsDiffAlgorithmsTest.kt$TlsDiffAlgorithmsTest$@Test fun testClientServerTlsExchange()</ID>
|
||||||
<ID>ComplexMethod:TlsDiffProtocolsTest.kt$TlsDiffProtocolsTest$@Test fun testClientServerTlsExchange()</ID>
|
<ID>ComplexMethod:TlsDiffProtocolsTest.kt$TlsDiffProtocolsTest$@Test fun testClientServerTlsExchange()</ID>
|
||||||
<ID>ComplexMethod:TransactionUtils.kt$ fun createComponentGroups(inputs: List<StateRef>, outputs: List<TransactionState<ContractState>>, commands: List<Command<*>>, attachments: List<SecureHash>, notary: Party?, timeWindow: TimeWindow?, references: List<StateRef>, networkParametersHash: SecureHash?): List<ComponentGroup></ID>
|
<ID>ComplexMethod:TransactionUtils.kt$ fun createComponentGroups(inputs: List<StateRef>, outputs: List<TransactionState<ContractState>>, commands: List<Command<*>>, attachments: List<SecureHash>, notary: Party?, timeWindow: TimeWindow?, references: List<StateRef>, networkParametersHash: SecureHash?): List<ComponentGroup></ID>
|
||||||
|
<ID>ComplexMethod:TransitionExecutorImpl.kt$TransitionExecutorImpl$@Suppress("NestedBlockDepth", "ReturnCount") @Suspendable override fun executeTransition( fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor ): Pair<FlowContinuation, StateMachineState></ID>
|
||||||
<ID>ComplexMethod:TypeModellingFingerPrinter.kt$FingerPrintingState$// For a type we haven't seen before, determine the correct path depending on the type of type it is. private fun fingerprintNewType(type: LocalTypeInformation)</ID>
|
<ID>ComplexMethod:TypeModellingFingerPrinter.kt$FingerPrintingState$// For a type we haven't seen before, determine the correct path depending on the type of type it is. private fun fingerprintNewType(type: LocalTypeInformation)</ID>
|
||||||
<ID>ComplexMethod:UniversalContract.kt$UniversalContract$override fun verify(tx: LedgerTransaction)</ID>
|
<ID>ComplexMethod:UniversalContract.kt$UniversalContract$override fun verify(tx: LedgerTransaction)</ID>
|
||||||
<ID>ComplexMethod:Util.kt$fun <T> debugCompare(perLeft: Perceivable<T>, perRight: Perceivable<T>)</ID>
|
<ID>ComplexMethod:Util.kt$fun <T> debugCompare(perLeft: Perceivable<T>, perRight: Perceivable<T>)</ID>
|
||||||
@ -3975,6 +3977,7 @@
|
|||||||
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
|
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
|
||||||
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>
|
||||||
|
<ID>TooGenericExceptionCaught:ErrorHandling.kt$ErrorHandling.CheckpointAfterErrorFlow$t: Throwable</ID>
|
||||||
<ID>TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception</ID>
|
<ID>TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:Eventually.kt$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:Eventually.kt$e: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:Expect.kt$exception: Exception</ID>
|
<ID>TooGenericExceptionCaught:Expect.kt$exception: Exception</ID>
|
||||||
@ -4109,6 +4112,7 @@
|
|||||||
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" )</ID>
|
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" )</ID>
|
||||||
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )</ID>
|
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )</ID>
|
||||||
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however they only have $senderQuantity!" )</ID>
|
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however they only have $senderQuantity!" )</ID>
|
||||||
|
<ID>TooGenericExceptionThrown:DbListenerService.kt$DbListenerService$throw Exception("Mother of all exceptions")</ID>
|
||||||
<ID>TooGenericExceptionThrown:FlowAsyncOperationTests.kt$FlowAsyncOperationTests.ErroredExecute$throw Exception()</ID>
|
<ID>TooGenericExceptionThrown:FlowAsyncOperationTests.kt$FlowAsyncOperationTests.ErroredExecute$throw Exception()</ID>
|
||||||
<ID>TooGenericExceptionThrown:FlowFrameworkTests.kt$FlowFrameworkTests$throw Exception("Error")</ID>
|
<ID>TooGenericExceptionThrown:FlowFrameworkTests.kt$FlowFrameworkTests$throw Exception("Error")</ID>
|
||||||
<ID>TooGenericExceptionThrown:Generator.kt$Generator$throw Exception("Failed to generate", error)</ID>
|
<ID>TooGenericExceptionThrown:Generator.kt$Generator$throw Exception("Failed to generate", error)</ID>
|
||||||
|
@ -4,6 +4,7 @@ import co.paralleluniverse.strands.Strand
|
|||||||
import com.zaxxer.hikari.HikariDataSource
|
import com.zaxxer.hikari.HikariDataSource
|
||||||
import com.zaxxer.hikari.pool.HikariPool
|
import com.zaxxer.hikari.pool.HikariPool
|
||||||
import com.zaxxer.hikari.util.ConcurrentBag
|
import com.zaxxer.hikari.util.ConcurrentBag
|
||||||
|
import net.corda.core.flows.HospitalizeFlowException
|
||||||
import net.corda.core.internal.NamedCacheFactory
|
import net.corda.core.internal.NamedCacheFactory
|
||||||
import net.corda.core.schemas.MappedSchema
|
import net.corda.core.schemas.MappedSchema
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -100,7 +101,7 @@ class CordaPersistence(
|
|||||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
|
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
|
||||||
customClassLoader: ClassLoader? = null,
|
customClassLoader: ClassLoader? = null,
|
||||||
val closeConnection: Boolean = true,
|
val closeConnection: Boolean = true,
|
||||||
val errorHandler: (t: Throwable) -> Unit = {}
|
val errorHandler: DatabaseTransaction.(e: Exception) -> Unit = {}
|
||||||
) : Closeable {
|
) : Closeable {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
@ -191,17 +192,17 @@ class CordaPersistence(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun createSession(): Connection {
|
fun createSession(): Connection {
|
||||||
|
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||||
|
_contextDatabase.set(this)
|
||||||
|
val transaction = contextTransaction
|
||||||
try {
|
try {
|
||||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
transaction.session.flush()
|
||||||
_contextDatabase.set(this)
|
return transaction.connection
|
||||||
currentDBSession().flush()
|
} catch (e: Exception) {
|
||||||
return contextTransaction.connection
|
if (e is SQLException || e is PersistenceException) {
|
||||||
} catch (sqlException: SQLException) {
|
transaction.errorHandler(e)
|
||||||
errorHandler(sqlException)
|
}
|
||||||
throw sqlException
|
throw e
|
||||||
} catch (persistenceException: PersistenceException) {
|
|
||||||
errorHandler(persistenceException)
|
|
||||||
throw persistenceException
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,18 +231,22 @@ class CordaPersistence(
|
|||||||
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
|
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
|
||||||
_contextDatabase.set(this)
|
_contextDatabase.set(this)
|
||||||
val outer = contextTransactionOrNull
|
val outer = contextTransactionOrNull
|
||||||
try {
|
return if (outer != null) {
|
||||||
return if (outer != null) {
|
// we only need to handle errors coming out of inner transactions because,
|
||||||
|
// a. whenever this code is being executed within the flow state machine, a top level transaction should have
|
||||||
|
// previously been created by the flow state machine in ActionExecutorImpl#executeCreateTransaction
|
||||||
|
// b. exceptions coming out from top level transactions are already being handled in CordaPersistence#inTopLevelTransaction
|
||||||
|
// i.e. roll back and close the transaction
|
||||||
|
try {
|
||||||
outer.statement()
|
outer.statement()
|
||||||
} else {
|
} catch (e: Exception) {
|
||||||
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
|
if (e is SQLException || e is PersistenceException || e is HospitalizeFlowException) {
|
||||||
|
outer.errorHandler(e)
|
||||||
|
}
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
} catch (sqlException: SQLException) {
|
} else {
|
||||||
errorHandler(sqlException)
|
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
|
||||||
throw sqlException
|
|
||||||
} catch (persistenceException: PersistenceException) {
|
|
||||||
errorHandler(persistenceException)
|
|
||||||
throw persistenceException
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package net.corda.nodeapi.internal.persistence
|
package net.corda.nodeapi.internal.persistence
|
||||||
|
|
||||||
import co.paralleluniverse.strands.Strand
|
import co.paralleluniverse.strands.Strand
|
||||||
import org.hibernate.BaseSessionEventListener
|
import net.corda.core.CordaRuntimeException
|
||||||
import org.hibernate.Session
|
import org.hibernate.Session
|
||||||
import org.hibernate.Transaction
|
import org.hibernate.Transaction
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -51,7 +51,27 @@ class DatabaseTransaction(
|
|||||||
private var committed = false
|
private var committed = false
|
||||||
private var closed = false
|
private var closed = false
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds the first exception thrown from a series of statements executed in the same [DatabaseTransaction].
|
||||||
|
* The purpose of this property is to make sure this exception cannot be suppressed in user code.
|
||||||
|
* The exception will be thrown on the next [commit]. It is used only inside a flow state machine execution.
|
||||||
|
*/
|
||||||
|
private var firstExceptionInDatabaseTransaction: Exception? = null
|
||||||
|
|
||||||
|
fun setException(e: Exception) {
|
||||||
|
if (firstExceptionInDatabaseTransaction == null) {
|
||||||
|
firstExceptionInDatabaseTransaction = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun clearException() {
|
||||||
|
firstExceptionInDatabaseTransaction = null
|
||||||
|
}
|
||||||
|
|
||||||
fun commit() {
|
fun commit() {
|
||||||
|
firstExceptionInDatabaseTransaction?.let {
|
||||||
|
throw DatabaseTransactionException(it)
|
||||||
|
}
|
||||||
if (sessionDelegate.isInitialized()) {
|
if (sessionDelegate.isInitialized()) {
|
||||||
hibernateTransaction.commit()
|
hibernateTransaction.commit()
|
||||||
}
|
}
|
||||||
@ -66,16 +86,18 @@ class DatabaseTransaction(
|
|||||||
if (!connection.isClosed) {
|
if (!connection.isClosed) {
|
||||||
connection.rollback()
|
connection.rollback()
|
||||||
}
|
}
|
||||||
|
clearException()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun close() {
|
fun close() {
|
||||||
if (sessionDelegate.isInitialized() && session.isOpen) {
|
if (sessionDelegate.isInitialized() && session.isOpen) {
|
||||||
session.close()
|
session.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if (database.closeConnection) {
|
if (database.closeConnection) {
|
||||||
connection.close()
|
connection.close()
|
||||||
}
|
}
|
||||||
|
clearException()
|
||||||
|
|
||||||
contextTransactionOrNull = outerTransaction
|
contextTransactionOrNull = outerTransaction
|
||||||
if (outerTransaction == null) {
|
if (outerTransaction == null) {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
@ -99,3 +121,7 @@ class DatabaseTransaction(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper exception, for any exception registered as [DatabaseTransaction.firstExceptionInDatabaseTransaction].
|
||||||
|
*/
|
||||||
|
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
|
@ -1,9 +1,13 @@
|
|||||||
package net.corda.node.services.vault
|
package net.corda.node.services.vault
|
||||||
|
|
||||||
|
import co.paralleluniverse.strands.concurrent.Semaphore
|
||||||
import com.r3.dbfailure.workflows.CreateStateFlow
|
import com.r3.dbfailure.workflows.CreateStateFlow
|
||||||
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
|
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
|
||||||
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
|
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
|
||||||
import net.corda.core.CordaRuntimeException
|
import com.r3.dbfailure.workflows.DbListenerService
|
||||||
|
import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
|
||||||
|
import com.r3.transactionfailure.workflows.ErrorHandling
|
||||||
|
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.messaging.startFlow
|
import net.corda.core.messaging.startFlow
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -13,19 +17,20 @@ import net.corda.node.services.Permissions
|
|||||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.driver.DriverParameters
|
import net.corda.testing.driver.DriverParameters
|
||||||
|
import net.corda.testing.driver.OutOfProcess
|
||||||
import net.corda.testing.driver.driver
|
import net.corda.testing.driver.driver
|
||||||
import net.corda.testing.node.User
|
import net.corda.testing.node.User
|
||||||
import net.corda.testing.node.internal.findCordapp
|
import net.corda.testing.node.internal.findCordapp
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Assert
|
import org.junit.Assert
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import rx.exceptions.OnErrorNotImplementedException
|
import java.lang.IllegalStateException
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
import java.time.Duration
|
import java.util.concurrent.TimeUnit
|
||||||
import java.time.temporal.ChronoUnit
|
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
import javax.persistence.PersistenceException
|
import javax.persistence.PersistenceException
|
||||||
import kotlin.test.assertFailsWith
|
import kotlin.test.assertFailsWith
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class VaultObserverExceptionTest {
|
class VaultObserverExceptionTest {
|
||||||
companion object {
|
companion object {
|
||||||
@ -43,6 +48,7 @@ class VaultObserverExceptionTest {
|
|||||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||||
StaffedFlowHospital.onFlowAdmitted.clear()
|
StaffedFlowHospital.onFlowAdmitted.clear()
|
||||||
|
DbListenerService.onError = null
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -50,12 +56,11 @@ class VaultObserverExceptionTest {
|
|||||||
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation
|
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun unhandledSqlExceptionFromVaultObserverGetsHospitatlised() {
|
fun unhandledSqlExceptionFromVaultObserverGetsHospitalised() {
|
||||||
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
||||||
|
|
||||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||||
when (it) {
|
when (it) {
|
||||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
|
||||||
is SQLException -> {
|
is SQLException -> {
|
||||||
testControlFuture.complete(true)
|
testControlFuture.complete(true)
|
||||||
}
|
}
|
||||||
@ -80,58 +85,17 @@ class VaultObserverExceptionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throwing a random (non-SQL releated) exception from a vault observer causes the flow to be
|
* None exception thrown from a vault observer can be suppressible in the flow that triggered the observer
|
||||||
* aborted when unhandled in user code
|
* because the recording of transaction states failed. The flow will be hospitalized.
|
||||||
|
* The exception will bring the rx.Observer down.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun otherExceptionsFromVaultObserverBringFlowDown() {
|
fun exceptionFromVaultObserverCannotBeSuppressedInFlow() {
|
||||||
driver(DriverParameters(
|
|
||||||
startNodesInProcess = true,
|
|
||||||
cordappsForAllNodes = testCordapps())) {
|
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
|
||||||
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
|
|
||||||
aliceNode.rpc.startFlow(
|
|
||||||
::Initiator,
|
|
||||||
"InvalidParameterException",
|
|
||||||
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter)
|
|
||||||
).returnValue.getOrThrow(30.seconds)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A random exception from a VaultObserver will bring the Rx Observer down, but can be handled in the flow
|
|
||||||
* triggering the observer, and the flow will continue successfully (for some values of success)
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
|
|
||||||
driver(DriverParameters(
|
|
||||||
startNodesInProcess = true,
|
|
||||||
cordappsForAllNodes = testCordapps())) {
|
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
|
||||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
|
||||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
|
||||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
|
||||||
.returnValue.getOrThrow(30.seconds)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If the state we are trying to persist triggers a persistence exception, the flow hospital will retry the flow
|
|
||||||
* and keep it in for observation if errors persist.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
fun persistenceExceptionOnCommitGetsRetriedAndThenGetsKeptForObservation() {
|
|
||||||
var admitted = 0
|
|
||||||
var observation = 0
|
var observation = 0
|
||||||
StaffedFlowHospital.onFlowAdmitted.add {
|
val waitUntilHospitalised = Semaphore(0)
|
||||||
++admitted
|
|
||||||
}
|
|
||||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
||||||
++observation
|
++observation
|
||||||
|
waitUntilHospitalised.release()
|
||||||
}
|
}
|
||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
@ -139,25 +103,52 @@ class VaultObserverExceptionTest {
|
|||||||
cordappsForAllNodes = testCordapps())) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
assertFailsWith<TimeoutException> {
|
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||||
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
|
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions,
|
||||||
.returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||||
}
|
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||||
}
|
}
|
||||||
Assert.assertTrue("Exception from service has not been to Hospital", admitted > 0)
|
|
||||||
Assert.assertEquals(1, observation)
|
Assert.assertEquals(1, observation)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
|
* None runtime exception thrown from a vault observer can be suppressible in the flow that triggered the observer
|
||||||
* the vault observer will trigger a flush that throws. This will be kept in for observation.
|
* because the recording of transaction states failed. The flow will be hospitalized.
|
||||||
|
* The exception will bring the rx.Observer down.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun persistenceExceptionOnFlushGetsRetriedAndThenGetsKeptForObservation() {
|
fun runtimeExceptionFromVaultObserverCannotBeSuppressedInFlow() {
|
||||||
|
var observation = 0
|
||||||
|
val waitUntilHospitalised = Semaphore(0)
|
||||||
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
||||||
|
++observation
|
||||||
|
waitUntilHospitalised.release()
|
||||||
|
}
|
||||||
|
|
||||||
|
driver(DriverParameters(
|
||||||
|
startNodesInProcess = true,
|
||||||
|
cordappsForAllNodes = testCordapps())) {
|
||||||
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
|
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||||
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||||
|
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, observation)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If we have a state causing a persistence exception during record transactions (in NodeVaultService#processAndNotify),
|
||||||
|
* the flow will be kept in for observation.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
fun persistenceExceptionDuringRecordTransactionsGetsKeptForObservation() {
|
||||||
var counter = 0
|
var counter = 0
|
||||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||||
when (it) {
|
when (it) {
|
||||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
|
||||||
is PersistenceException -> {
|
is PersistenceException -> {
|
||||||
++counter
|
++counter
|
||||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
log.info("Got a PersistentException in the flow hospital count = $counter")
|
||||||
@ -177,7 +168,6 @@ class VaultObserverExceptionTest {
|
|||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
assertFailsWith<TimeoutException>("PersistenceException") {
|
assertFailsWith<TimeoutException>("PersistenceException") {
|
||||||
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
|
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
|
||||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
|
||||||
CreateStateFlow.ErrorTarget.TxInvalidState))
|
CreateStateFlow.ErrorTarget.TxInvalidState))
|
||||||
.returnValue.getOrThrow(30.seconds)
|
.returnValue.getOrThrow(30.seconds)
|
||||||
}
|
}
|
||||||
@ -187,56 +177,15 @@ class VaultObserverExceptionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
|
* If we have a state causing a persistence exception during record transactions (in NodeVaultService#processAndNotify),
|
||||||
* the vault observer will trigger a flush that throws.
|
* trying to catch and suppress that exception inside the flow does protect the flow, but the new
|
||||||
* Trying to catch and suppress that exception in the flow around the code triggering the vault observer
|
* interceptor will fail the flow anyway. The flow will be kept in for observation.
|
||||||
* does not change the outcome - the first exception in the service will bring the service down and will
|
|
||||||
* be caught by the flow, but the state machine will error the flow anyway as Corda code threw.
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInFlow() {
|
fun persistenceExceptionDuringRecordTransactionsCannotBeSuppressedInFlow() {
|
||||||
var counter = 0
|
var counter = 0
|
||||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||||
when (it) {
|
when (it) {
|
||||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
|
||||||
is PersistenceException -> {
|
|
||||||
++counter
|
|
||||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
driver(DriverParameters(
|
|
||||||
startNodesInProcess = true,
|
|
||||||
cordappsForAllNodes = testCordapps())) {
|
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
|
||||||
val flowHandle = aliceNode.rpc.startFlow(
|
|
||||||
::Initiator,
|
|
||||||
"EntityManager",
|
|
||||||
CreateStateFlow.errorTargetsToNum(
|
|
||||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
|
||||||
CreateStateFlow.ErrorTarget.TxInvalidState,
|
|
||||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
|
||||||
val flowResult = flowHandle.returnValue
|
|
||||||
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
|
|
||||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If we have a state causing a persistence exception lined up for persistence, calling jdbConnection() in
|
|
||||||
* the vault observer will trigger a flush that throws.
|
|
||||||
* Trying to catch and suppress that exception inside the service does protect the service, but the new
|
|
||||||
* interceptor will fail the flow anyway. The flow will be kept in for observation if errors persist.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInService() {
|
|
||||||
var counter = 0
|
|
||||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
|
||||||
when (it) {
|
|
||||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
|
||||||
is PersistenceException -> {
|
is PersistenceException -> {
|
||||||
++counter
|
++counter
|
||||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
log.info("Got a PersistentException in the flow hospital count = $counter")
|
||||||
@ -253,9 +202,8 @@ class VaultObserverExceptionTest {
|
|||||||
val flowHandle = aliceNode.rpc.startFlow(
|
val flowHandle = aliceNode.rpc.startFlow(
|
||||||
::Initiator, "EntityManager",
|
::Initiator, "EntityManager",
|
||||||
CreateStateFlow.errorTargetsToNum(
|
CreateStateFlow.errorTargetsToNum(
|
||||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
|
||||||
CreateStateFlow.ErrorTarget.TxInvalidState,
|
CreateStateFlow.ErrorTarget.TxInvalidState,
|
||||||
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||||
val flowResult = flowHandle.returnValue
|
val flowResult = flowHandle.returnValue
|
||||||
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
|
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
|
||||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||||
@ -310,4 +258,109 @@ class VaultObserverExceptionTest {
|
|||||||
flowResult.getOrThrow(30.seconds)
|
flowResult.getOrThrow(30.seconds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exceptions thrown from a vault observer ,are now wrapped and rethrown as a HospitalizeFlowException.
|
||||||
|
* The flow should get hospitalised and any potential following checkpoint should fail.
|
||||||
|
* In case of a SQLException or PersistenceException, this was already "breaking" the database transaction
|
||||||
|
* and therefore, the next checkpoint was failing.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
fun `attempt to checkpoint, following an error thrown in vault observer which gets supressed in flow, will fail`() {
|
||||||
|
var counterBeforeFirstCheckpoint = 0
|
||||||
|
var counterAfterFirstCheckpoint = 0
|
||||||
|
var counterAfterSecondCheckpoint = 0
|
||||||
|
|
||||||
|
ErrorHandling.hookBeforeFirstCheckpoint = { counterBeforeFirstCheckpoint++ }
|
||||||
|
ErrorHandling.hookAfterFirstCheckpoint = { counterAfterFirstCheckpoint++ }
|
||||||
|
ErrorHandling.hookAfterSecondCheckpoint = { counterAfterSecondCheckpoint++ }
|
||||||
|
|
||||||
|
val waitUntilHospitalised = Semaphore(0)
|
||||||
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
||||||
|
waitUntilHospitalised.release()
|
||||||
|
}
|
||||||
|
|
||||||
|
driver(DriverParameters(
|
||||||
|
inMemoryDB = false,
|
||||||
|
startNodesInProcess = true,
|
||||||
|
isDebug = true,
|
||||||
|
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
|
||||||
|
findCordapp("com.r3.dbfailure.workflows"),
|
||||||
|
findCordapp("com.r3.transactionfailure.workflows"),
|
||||||
|
findCordapp("com.r3.dbfailure.schemas")))) {
|
||||||
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
|
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
|
|
||||||
|
node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum(
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception
|
||||||
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors
|
||||||
|
)
|
||||||
|
)
|
||||||
|
waitUntilHospitalised.acquire()
|
||||||
|
|
||||||
|
// restart node, see if flow retries from correct checkpoint
|
||||||
|
node.stop()
|
||||||
|
startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
|
waitUntilHospitalised.acquire()
|
||||||
|
|
||||||
|
// check flow retries from correct checkpoint
|
||||||
|
assertTrue(counterBeforeFirstCheckpoint == 1)
|
||||||
|
assertTrue(counterAfterFirstCheckpoint == 2)
|
||||||
|
assertTrue(counterAfterSecondCheckpoint == 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `vault observer failing with OnErrorFailedException gets hospitalised`() {
|
||||||
|
DbListenerService.onError = {
|
||||||
|
log.info("Error in rx.Observer#OnError! - Observer will fail with OnErrorFailedException")
|
||||||
|
throw it
|
||||||
|
}
|
||||||
|
|
||||||
|
var observation = 0
|
||||||
|
val waitUntilHospitalised = Semaphore(0)
|
||||||
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
||||||
|
++observation
|
||||||
|
waitUntilHospitalised.release()
|
||||||
|
}
|
||||||
|
|
||||||
|
driver(DriverParameters(
|
||||||
|
startNodesInProcess = true,
|
||||||
|
cordappsForAllNodes = testCordapps())) {
|
||||||
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
|
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||||
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||||
|
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, observation)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `out of memory error halts JVM, on node restart flow retries, and succeeds`() {
|
||||||
|
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = testCordapps())) {
|
||||||
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow()
|
||||||
|
aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow()
|
||||||
|
aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError))
|
||||||
|
|
||||||
|
val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
|
||||||
|
if (terminated) {
|
||||||
|
aliceNode.stop()
|
||||||
|
// starting node within the same process this time to take advantage of threads sharing same heap space
|
||||||
|
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
||||||
|
CreateStateFlow.Initiator.onExitingCall = {
|
||||||
|
testControlFuture.complete(true)
|
||||||
|
}
|
||||||
|
startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = true).getOrThrow()
|
||||||
|
assert(testControlFuture.getOrThrow(30.seconds))
|
||||||
|
} else {
|
||||||
|
throw IllegalStateException("Out of process node is still up and running!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -118,7 +118,6 @@ import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl
|
|||||||
import net.corda.node.services.persistence.PublicKeyToTextConverter
|
import net.corda.node.services.persistence.PublicKeyToTextConverter
|
||||||
import net.corda.node.services.rpc.CheckpointDumperImpl
|
import net.corda.node.services.rpc.CheckpointDumperImpl
|
||||||
import net.corda.node.services.schema.NodeSchemaService
|
import net.corda.node.services.schema.NodeSchemaService
|
||||||
import net.corda.node.services.statemachine.Event
|
|
||||||
import net.corda.node.services.statemachine.ExternalEvent
|
import net.corda.node.services.statemachine.ExternalEvent
|
||||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||||
import net.corda.node.services.statemachine.FlowMonitor
|
import net.corda.node.services.statemachine.FlowMonitor
|
||||||
@ -1165,6 +1164,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
|
|
||||||
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
|
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
|
||||||
return database.transaction {
|
return database.transaction {
|
||||||
|
session.flush()
|
||||||
block(restrictedEntityManager)
|
block(restrictedEntityManager)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1258,14 +1258,18 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
|||||||
|
|
||||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||||
return CordaPersistence(
|
return CordaPersistence(
|
||||||
databaseConfig,
|
databaseConfig,
|
||||||
schemaService.schemas,
|
schemaService.schemas,
|
||||||
jdbcUrl,
|
jdbcUrl,
|
||||||
cacheFactory,
|
cacheFactory,
|
||||||
attributeConverters, customClassLoader,
|
attributeConverters, customClassLoader,
|
||||||
errorHandler = { t ->
|
errorHandler = { e ->
|
||||||
FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t))
|
// "corrupting" a DatabaseTransaction only inside a flow state machine execution
|
||||||
})
|
FlowStateMachineImpl.currentStateMachine()?.let {
|
||||||
|
// register only the very first exception thrown throughout a chain of logical transactions
|
||||||
|
setException(e)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {
|
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
|
import net.corda.nodeapi.internal.persistence.DatabaseTransactionException
|
||||||
import net.corda.nodeapi.internal.persistence.contextDatabase
|
import net.corda.nodeapi.internal.persistence.contextDatabase
|
||||||
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||||
import java.security.SecureRandom
|
import java.security.SecureRandom
|
||||||
@ -61,11 +62,23 @@ class TransitionExecutorImpl(
|
|||||||
} else {
|
} else {
|
||||||
log.info("Error while executing $action, with event $event, erroring state", exception)
|
log.info("Error while executing $action, with event $event, erroring state", exception)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// distinguish between a DatabaseTransactionException and an actual StateTransitionException
|
||||||
|
val stateTransitionOrDatabaseTransactionException =
|
||||||
|
if (exception is DatabaseTransactionException) {
|
||||||
|
// if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
|
||||||
|
// it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
|
||||||
|
// it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
|
||||||
|
exception.cause
|
||||||
|
} else {
|
||||||
|
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
|
||||||
|
StateTransitionException(action, event, exception)
|
||||||
|
}
|
||||||
|
|
||||||
val newState = previousState.copy(
|
val newState = previousState.copy(
|
||||||
checkpoint = previousState.checkpoint.copy(
|
checkpoint = previousState.checkpoint.copy(
|
||||||
errorState = previousState.checkpoint.errorState.addErrors(
|
errorState = previousState.checkpoint.errorState.addErrors(
|
||||||
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
|
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
|
||||||
listOf(FlowError(secureRandom.nextLong(), StateTransitionException(action, event, exception)))
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
isFlowResumed = false
|
isFlowResumed = false
|
||||||
|
@ -5,6 +5,7 @@ import co.paralleluniverse.strands.Strand
|
|||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.containsAny
|
import net.corda.core.crypto.containsAny
|
||||||
|
import net.corda.core.flows.HospitalizeFlowException
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.node.ServicesForResolution
|
import net.corda.core.node.ServicesForResolution
|
||||||
@ -26,11 +27,13 @@ import rx.Observable
|
|||||||
import rx.exceptions.OnErrorNotImplementedException
|
import rx.exceptions.OnErrorNotImplementedException
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
import java.sql.SQLException
|
||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.CopyOnWriteArraySet
|
import java.util.concurrent.CopyOnWriteArraySet
|
||||||
|
import javax.persistence.PersistenceException
|
||||||
import javax.persistence.Tuple
|
import javax.persistence.Tuple
|
||||||
import javax.persistence.criteria.CriteriaBuilder
|
import javax.persistence.criteria.CriteriaBuilder
|
||||||
import javax.persistence.criteria.CriteriaUpdate
|
import javax.persistence.criteria.CriteriaUpdate
|
||||||
@ -393,12 +396,25 @@ class NodeVaultService(
|
|||||||
persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references)
|
persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references)
|
||||||
try {
|
try {
|
||||||
updatesPublisher.onNext(vaultUpdate)
|
updatesPublisher.onNext(vaultUpdate)
|
||||||
} catch (e: OnErrorNotImplementedException) {
|
} catch (e: Exception) {
|
||||||
log.warn("Caught an Rx.OnErrorNotImplementedException " +
|
// exception thrown here will cause the recording of transaction states to the vault being rolled back
|
||||||
"- caused by an exception in an RX observer that was unhandled " +
|
// it could cause the ledger go into an inconsistent state, therefore we should hospitalise this flow
|
||||||
"- the observer has been unsubscribed! The underlying exception will be rethrown.", e)
|
// observer code should either be fixed or ignored and have the flow retry from previous checkpoint
|
||||||
// if the observer code threw, unwrap their exception from the RX wrapper
|
log.error(
|
||||||
throw e.cause ?: e
|
"Failed to record transaction states locally " +
|
||||||
|
"- the node could be now in an inconsistent state with other peers and/or the notary " +
|
||||||
|
"- hospitalising the flow ", e
|
||||||
|
)
|
||||||
|
|
||||||
|
throw (e as? OnErrorNotImplementedException)?.let {
|
||||||
|
it.cause?.let { wrapped ->
|
||||||
|
if (wrapped is SQLException || wrapped is PersistenceException) {
|
||||||
|
wrapped
|
||||||
|
} else {
|
||||||
|
HospitalizeFlowException(wrapped)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} ?: HospitalizeFlowException(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ object CreateStateFlow {
|
|||||||
ServiceReadState(4),
|
ServiceReadState(4),
|
||||||
ServiceCheckForState(5),
|
ServiceCheckForState(5),
|
||||||
ServiceThrowInvalidParameter(6),
|
ServiceThrowInvalidParameter(6),
|
||||||
|
ServiceThrowMotherOfAllExceptions(7),
|
||||||
|
ServiceThrowUnrecoverableError(8),
|
||||||
TxInvalidState(10),
|
TxInvalidState(10),
|
||||||
FlowSwallowErrors(100),
|
FlowSwallowErrors(100),
|
||||||
ServiceSwallowErrors(1000)
|
ServiceSwallowErrors(1000)
|
||||||
@ -56,6 +58,9 @@ object CreateStateFlow {
|
|||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
@StartableByRPC
|
@StartableByRPC
|
||||||
class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic<UniqueIdentifier>() {
|
class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic<UniqueIdentifier>() {
|
||||||
|
companion object {
|
||||||
|
var onExitingCall: () -> Unit = {}
|
||||||
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun call(): UniqueIdentifier {
|
override fun call(): UniqueIdentifier {
|
||||||
@ -93,6 +98,7 @@ object CreateStateFlow {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info("Test flow: returning")
|
logger.info("Test flow: returning")
|
||||||
|
onExitingCall()
|
||||||
return state.linearId
|
return state.linearId
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,12 @@
|
|||||||
package com.r3.dbfailure.workflows
|
package com.r3.dbfailure.workflows
|
||||||
|
|
||||||
import com.r3.dbfailure.contracts.DbFailureContract
|
import com.r3.dbfailure.contracts.DbFailureContract
|
||||||
|
import net.corda.core.contracts.ContractState
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.StartableByRPC
|
||||||
import net.corda.core.node.AppServiceHub
|
import net.corda.core.node.AppServiceHub
|
||||||
import net.corda.core.node.services.CordaService
|
import net.corda.core.node.services.CordaService
|
||||||
|
import net.corda.core.node.services.Vault
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import java.security.InvalidParameterException
|
import java.security.InvalidParameterException
|
||||||
@ -12,10 +16,15 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
val log = contextLogger()
|
val log = contextLogger()
|
||||||
|
var onError: ((Throwable) -> Unit)? = null
|
||||||
|
|
||||||
|
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
|
||||||
|
var throwUnrecoverableError = false
|
||||||
}
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
services.vaultService.rawUpdates.subscribe { (_, produced) ->
|
val onNext: (Vault.Update<ContractState>) -> Unit =
|
||||||
|
{ (_, produced) ->
|
||||||
produced.forEach {
|
produced.forEach {
|
||||||
val contractState = it.state.data as? DbFailureContract.TestState
|
val contractState = it.state.data as? DbFailureContract.TestState
|
||||||
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
|
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
|
||||||
@ -26,9 +35,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val statement = session.createStatement()
|
val statement = session.createStatement()
|
||||||
statement.execute(
|
statement.execute(
|
||||||
"UPDATE FAIL_TEST_STATES \n" +
|
"UPDATE FAIL_TEST_STATES \n" +
|
||||||
"BLAAA RANDOM_VALUE = NULL\n" +
|
"BLAAA RANDOM_VALUE = NULL\n" +
|
||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
log.info("SQL result: ${statement.resultSet}")
|
log.info("SQL result: ${statement.resultSet}")
|
||||||
}
|
}
|
||||||
@ -37,9 +46,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val statement = session.createStatement()
|
val statement = session.createStatement()
|
||||||
statement.execute(
|
statement.execute(
|
||||||
"UPDATE FAIL_TEST_STATES \n" +
|
"UPDATE FAIL_TEST_STATES \n" +
|
||||||
"SET RANDOM_VALUE = NULL\n" +
|
"SET RANDOM_VALUE = NULL\n" +
|
||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
log.info("SQL result: ${statement.resultSet}")
|
log.info("SQL result: ${statement.resultSet}")
|
||||||
}
|
}
|
||||||
@ -48,9 +57,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val statement = session.createStatement()
|
val statement = session.createStatement()
|
||||||
statement.execute(
|
statement.execute(
|
||||||
"UPDATE FAIL_TEST_STATES \n" +
|
"UPDATE FAIL_TEST_STATES \n" +
|
||||||
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
|
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
|
||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
log.info("SQL result: ${statement.resultSet}")
|
log.info("SQL result: ${statement.resultSet}")
|
||||||
}
|
}
|
||||||
@ -59,8 +68,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val statement = session.createStatement()
|
val statement = session.createStatement()
|
||||||
statement.execute(
|
statement.execute(
|
||||||
"SELECT * FROM FAIL_TEST_STATES \n" +
|
"SELECT * FROM FAIL_TEST_STATES \n" +
|
||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
log.info("SQL result: ${statement.resultSet}")
|
log.info("SQL result: ${statement.resultSet}")
|
||||||
}
|
}
|
||||||
@ -69,8 +78,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
val session = services.jdbcSession()
|
val session = services.jdbcSession()
|
||||||
val statement = session.createStatement()
|
val statement = session.createStatement()
|
||||||
val rs = statement.executeQuery(
|
val rs = statement.executeQuery(
|
||||||
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
|
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
|
||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
||||||
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
|
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
|
||||||
@ -80,13 +89,25 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
log.info("Throw InvalidParameterException")
|
log.info("Throw InvalidParameterException")
|
||||||
throw InvalidParameterException("Toys out of pram")
|
throw InvalidParameterException("Toys out of pram")
|
||||||
}
|
}
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
|
||||||
|
log.info("Throw Exception")
|
||||||
|
throw Exception("Mother of all exceptions")
|
||||||
|
}
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
|
||||||
|
// this bit of code should only work in a OutOfProcess node,
|
||||||
|
// otherwise it will kill the testing jvm (including the testing thread)
|
||||||
|
if (throwUnrecoverableError) {
|
||||||
|
log.info("Throw Unrecoverable error")
|
||||||
|
throw OutOfMemoryError("Unrecoverable error")
|
||||||
|
}
|
||||||
|
}
|
||||||
else -> {
|
else -> {
|
||||||
// do nothing, everything else must be handled elsewhere
|
// do nothing, everything else must be handled elsewhere
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
|
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
|
||||||
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
||||||
log.warn("Service not letting errors escape", t)
|
log.warn("Service not letting errors escape", t)
|
||||||
} else {
|
} else {
|
||||||
throw t
|
throw t
|
||||||
@ -94,5 +115,19 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (onError != null) {
|
||||||
|
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
|
||||||
|
} else {
|
||||||
|
services.vaultService.rawUpdates.subscribe(onNext)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@StartableByRPC
|
||||||
|
class MakeServiceThrowErrorFlow: FlowLogic<Unit>() {
|
||||||
|
override fun call() {
|
||||||
|
throwUnrecoverableError = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
package com.r3.transactionfailure.workflows
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import com.r3.dbfailure.contracts.DbFailureContract
|
||||||
|
import com.r3.dbfailure.workflows.CreateStateFlow
|
||||||
|
import net.corda.core.contracts.Command
|
||||||
|
import net.corda.core.contracts.UniqueIdentifier
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.StartableByRPC
|
||||||
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
|
|
||||||
|
object ErrorHandling {
|
||||||
|
var hookBeforeFirstCheckpoint: () -> Unit = {}
|
||||||
|
var hookAfterFirstCheckpoint: () -> Unit = {}
|
||||||
|
var hookAfterSecondCheckpoint: () -> Unit = {}
|
||||||
|
|
||||||
|
@StartableByRPC
|
||||||
|
class CheckpointAfterErrorFlow(private val errorTarget: Int) : FlowLogic<Unit>() {
|
||||||
|
// We cannot allow this:
|
||||||
|
// recordTransactions -> throws HospitalizeException
|
||||||
|
// flow suppress the HospitalizeException
|
||||||
|
// flow checkpoints
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
||||||
|
hookBeforeFirstCheckpoint.invoke() // should be executed once
|
||||||
|
sleep(1.seconds) // checkpoint - flow should retry from this one
|
||||||
|
hookAfterFirstCheckpoint.invoke() // should be executed twice
|
||||||
|
val txTarget = CreateStateFlow.getTxTarget(errorTarget)
|
||||||
|
val state = DbFailureContract.TestState(
|
||||||
|
UniqueIdentifier(),
|
||||||
|
ourIdentity,
|
||||||
|
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value",
|
||||||
|
errorTarget)
|
||||||
|
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
||||||
|
val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand)
|
||||||
|
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||||
|
try {
|
||||||
|
serviceHub.recordTransactions(signedTx)
|
||||||
|
} catch(t: Throwable) {
|
||||||
|
if (CreateStateFlow.getFlowTarget(errorTarget) == CreateStateFlow.ErrorTarget.FlowSwallowErrors) {
|
||||||
|
logger.info("Test flow: Swallowing all exception! Muahahaha!", t)
|
||||||
|
} else {
|
||||||
|
logger.info("Test flow: caught exception - rethrowing")
|
||||||
|
throw t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleep(1.seconds) // checkpoint - this checkpoint should fail
|
||||||
|
hookAfterSecondCheckpoint.invoke() // should be never executed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Reference in New Issue
Block a user