mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
CORDA-3217 and CORDA-3195 Various bits arond SQL exceptions and flow hospital (#2605)
* Unwrap rx.OnErrorNotImplementedException so the hospital can handle the cause appropriately * Add db failure cordapp * Renamed folders to avoid ambiguity in gradle * Add integration test for exception hospitalisation when thrown from an RX observable. * Make the test slightly cleaner * Fix the schema to actually match the requirements for my custom state. Thanks a bunch, H2. * Switch test to use SqlException base class. * Schedule error event if we detect that a commit or db flush has thrown (forcing the flow to error even if customer code then goes ahead to swallow the exception) * Revert change to schedule extra error * Add more tests for edge case with DB exceptions, changed CorDapp to suppor this an hook in the flow hospital * Warning about unsubscribe Check state transitioned from clean to error for hospital admission. * Match the test to our actual expectations * Revert "Revert change to schedule extra error" This reverts commit 43d47937 * Prevent suppression of errors arising in `transaction()` and `jdbcConnection()` * Test for SqlException caught trying to escape from recordTransaction and suppressed outside being intercepted. * More tests for various error/catch combinations * Clean up and comments * Code reformat * Fix test compilation
This commit is contained in:
parent
9b169df2b8
commit
1f71b071aa
@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.persistence.AttributeConverter
|
||||
import javax.persistence.PersistenceException
|
||||
import javax.sql.DataSource
|
||||
|
||||
/**
|
||||
@ -98,7 +99,8 @@ class CordaPersistence(
|
||||
cacheFactory: NamedCacheFactory,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
|
||||
customClassLoader: ClassLoader? = null,
|
||||
val closeConnection: Boolean = true
|
||||
val closeConnection: Boolean = true,
|
||||
val errorHandler: (t: Throwable) -> Unit = {}
|
||||
) : Closeable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
@ -189,10 +191,18 @@ class CordaPersistence(
|
||||
}
|
||||
|
||||
fun createSession(): Connection {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
_contextDatabase.set(this)
|
||||
currentDBSession().flush()
|
||||
return contextTransaction.connection
|
||||
try {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
_contextDatabase.set(this)
|
||||
currentDBSession().flush()
|
||||
return contextTransaction.connection
|
||||
} catch (sqlException: SQLException) {
|
||||
errorHandler(sqlException)
|
||||
throw sqlException
|
||||
} catch (persistenceException: PersistenceException) {
|
||||
errorHandler(persistenceException)
|
||||
throw persistenceException
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -220,10 +230,18 @@ class CordaPersistence(
|
||||
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
|
||||
_contextDatabase.set(this)
|
||||
val outer = contextTransactionOrNull
|
||||
return if (outer != null) {
|
||||
outer.statement()
|
||||
} else {
|
||||
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
|
||||
try {
|
||||
return if (outer != null) {
|
||||
outer.statement()
|
||||
} else {
|
||||
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
|
||||
}
|
||||
} catch (sqlException: SQLException) {
|
||||
errorHandler(sqlException)
|
||||
throw sqlException
|
||||
} catch (persistenceException: PersistenceException) {
|
||||
errorHandler(persistenceException)
|
||||
throw persistenceException
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,6 +243,8 @@ dependencies {
|
||||
slowIntegrationTestCompile configurations.testCompile
|
||||
slowIntegrationTestRuntime configurations.runtime
|
||||
slowIntegrationTestRuntime configurations.testRuntime
|
||||
|
||||
testCompile project(':testing:cordapps:dbfailure:dbfworkflows')
|
||||
}
|
||||
|
||||
tasks.withType(JavaCompile) {
|
||||
|
@ -140,7 +140,7 @@ class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
initiatingSession.receive<String>().unwrap { it }
|
||||
throw ClientRelevantException("Something went wrong!", SQLException("Oops!"))
|
||||
throw Exception("Something went wrong!", SQLException("Oops!"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,289 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import com.r3.dbfailure.workflows.CreateStateFlow
|
||||
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.node.services.statemachine.StateTransitionException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.findCordapp
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
import java.sql.SQLException
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class VaultObserverExceptionTest : IntegrationTest() {
|
||||
companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME)
|
||||
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
@After
|
||||
override fun tearDown() {
|
||||
super.tearDown()
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
StaffedFlowHospital.onFlowAdmitted.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
|
||||
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation
|
||||
*/
|
||||
@Test
|
||||
fun unhandledSqlExceptionFromVaultObserverGetsHospitatlised() {
|
||||
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
||||
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||
when (it) {
|
||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
||||
is SQLException -> {
|
||||
testControlFuture.complete(true)
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "Syntax Error in Custom SQL", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError))
|
||||
.returnValue.then { testControlFuture.complete(false) }
|
||||
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
||||
|
||||
Assert.assertTrue(foundExpectedException)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throwing a random (non-SQL releated) exception from a vault observer causes the flow to be
|
||||
* aborted when unhandled in user code
|
||||
*/
|
||||
@Test
|
||||
fun otherExceptionsFromVaultObserverBringFlowDown() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
|
||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter))
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A random exception from a VaultObserver will bring the Rx Observer down, but can be handled in the flow
|
||||
* triggering the observer, and the flow will continue successfully (for some values of success)
|
||||
*/
|
||||
@Test
|
||||
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the state we are trying to persist triggers a ConstraintViolation, the flow hospital will retry the flow
|
||||
* and finally give up on it
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationOnCommitGetsRetriedAndFinallyFails() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.onFlowAdmitted.add {
|
||||
++counter
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
assertFailsWith(StateTransitionException::class, "could not execute statement") {
|
||||
aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
|
||||
.returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("Exception from service has not been to Hospital", counter > 0)
|
||||
}
|
||||
|
||||
/**
|
||||
* If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in
|
||||
* the vault observer will trigger a flush that throws. This will be retried, and finally fail.
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationOnFlushGetsRetriedAndFinallyFails() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||
when (it) {
|
||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
||||
is PersistenceException -> {
|
||||
++counter
|
||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
assertFailsWith<StateTransitionException>("ConstraintViolationException") {
|
||||
aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState))
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||
}
|
||||
|
||||
/**
|
||||
* If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in
|
||||
* the vault observer will trigger a flush that throws. This will be retried, and finally fail.
|
||||
* Trying to catch and suppress that exception in the flow around the code triggering the vault observer
|
||||
* does not change the outcome - the first exception in the service will bring the service down and will
|
||||
* be caught by the flow, but the state machine will error the flow anyway as Corda code threw.
|
||||
* On retry, the error will hit the commit, as the observer is dead, and fail as above.
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInFlow() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||
when (it) {
|
||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
||||
is PersistenceException -> {
|
||||
++counter
|
||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
assertFailsWith<StateTransitionException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in
|
||||
* the vault observer will trigger a flush that throws. This will be retried, and finally fail.
|
||||
* Trying to catch and suppress that exception inside the service does protect the service, but the new
|
||||
* interceptor will fail the flow anyway. It will be retried and finally fail when the hospital gives
|
||||
* up retrying.
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInService() {
|
||||
var counter = 0
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||
when (it) {
|
||||
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
|
||||
is PersistenceException -> {
|
||||
++counter
|
||||
log.info("Got a PersistentException in the flow hospital count = $counter")
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
||||
CreateStateFlow.ErrorTarget.TxInvalidState,
|
||||
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
assertFailsWith<CordaRuntimeException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* User code throwing a constraint violation in a raw vault observer will break the recordTransaction call,
|
||||
* therefore handling it in flow code is no good, and the error will be passed to the flow hospital via the
|
||||
* interceptor.
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationInUserCodeInServiceCannotBeSuppressedInFlow() {
|
||||
val testControlFuture = openFuture<Boolean>()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
|
||||
log.info("Flow has been kept for overnight observation")
|
||||
testControlFuture.set(true)
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
flowResult.then {
|
||||
log.info("Flow has finished")
|
||||
testControlFuture.set(false)
|
||||
}
|
||||
Assert.assertTrue("Flow has not been kept in hospital", testControlFuture.getOrThrow(30.seconds))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* User code throwing a constraint violation and catching suppressing that within the observer code is fine
|
||||
* and should not have any impact on the rest of the flow
|
||||
*/
|
||||
@Test
|
||||
fun constraintViolationInUserCodeInServiceCanBeSuppressedInService() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation,
|
||||
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
flowResult.getOrThrow(30.seconds)
|
||||
}
|
||||
}
|
||||
}
|
@ -1141,7 +1141,9 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
val attributeConverters = listOf(PublicKeyToTextConverter(), AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader)
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader, errorHandler = {t ->
|
||||
FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t))
|
||||
})
|
||||
}
|
||||
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {
|
||||
|
@ -38,6 +38,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
DatabaseEndocrinologist,
|
||||
TransitionErrorGeneralPractitioner
|
||||
)
|
||||
|
||||
@VisibleForTesting
|
||||
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
|
||||
|
||||
@VisibleForTesting
|
||||
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
@ -110,6 +116,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
||||
val time = Instant.now()
|
||||
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
||||
onFlowAdmitted.forEach { it.invoke(flowFiber.id) }
|
||||
|
||||
val (event, backOffForChronicCondition) = mutex.locked {
|
||||
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
|
||||
@ -125,6 +132,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
Diagnosis.OVERNIGHT_OBSERVATION -> {
|
||||
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
|
||||
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
|
||||
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
|
||||
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
|
||||
}
|
||||
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
|
||||
|
@ -44,7 +44,7 @@ class HospitalisingInterceptor(
|
||||
|
||||
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
|
||||
|
||||
if (nextState.checkpoint.errorState is ErrorState.Errored) {
|
||||
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) {
|
||||
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
|
||||
if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) {
|
||||
flowHospital.flowErrored(fiber, previousState, exceptionsToHandle)
|
||||
|
@ -23,6 +23,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import org.hibernate.Session
|
||||
import rx.Observable
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
@ -390,7 +391,13 @@ class NodeVaultService(
|
||||
}
|
||||
}
|
||||
persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references)
|
||||
updatesPublisher.onNext(vaultUpdate)
|
||||
try {
|
||||
updatesPublisher.onNext(vaultUpdate)
|
||||
} catch (e: OnErrorNotImplementedException) {
|
||||
log.warn("Caught an Rx.OnErrorNotImplementedException - caused by an exception in an RX observer that was unhandled - the observer has been unsubscribed! The underlying exception will be rethrown.", e)
|
||||
// if the observer code threw, unwrap their exception from the RX wrapper
|
||||
throw e.cause ?: e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +78,8 @@ include 'samples:network-verifier:contracts'
|
||||
include 'samples:network-verifier:workflows'
|
||||
include 'serialization'
|
||||
include 'serialization-tests'
|
||||
include 'testing:cordapps:dbfailure:dbfcontracts'
|
||||
include 'testing:cordapps:dbfailure:dbfworkflows'
|
||||
|
||||
// Common libraries - start
|
||||
include 'common-validation'
|
||||
|
18
testing/cordapps/dbfailure/dbfcontracts/build.gradle
Normal file
18
testing/cordapps/dbfailure/dbfcontracts/build.gradle
Normal file
@ -0,0 +1,18 @@
|
||||
apply plugin: 'kotlin'
|
||||
//apply plugin: 'net.corda.plugins.cordapp'
|
||||
//apply plugin: 'net.corda.plugins.quasar-utils'
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
mavenCentral()
|
||||
maven { url "$artifactory_contextUrl/corda-dependencies" }
|
||||
maven { url "$artifactory_contextUrl/corda" }
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile project(":core")
|
||||
}
|
||||
|
||||
jar{
|
||||
baseName "testing-dbfailure-contracts"
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package com.r3.dbfailure.contracts
|
||||
|
||||
import com.r3.dbfailure.schemas.DbFailureSchemaV1
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.LinearState
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import java.lang.IllegalArgumentException
|
||||
|
||||
class DbFailureContract : Contract {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
val ID = "com.r3.dbfailure.contracts.DbFailureContract"
|
||||
}
|
||||
|
||||
class TestState(override val linearId: UniqueIdentifier, val particpant: Party, val randomValue: String?, val errorTarget: Int = 0) : LinearState, QueryableState {
|
||||
|
||||
override val participants: List<AbstractParty> = listOf(particpant)
|
||||
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DbFailureSchemaV1)
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return if (schema is DbFailureSchemaV1){
|
||||
DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id)
|
||||
}
|
||||
else {
|
||||
throw IllegalArgumentException("Unsupported schema $schema")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
// no op - don't care for now
|
||||
}
|
||||
|
||||
interface Commands : CommandData{
|
||||
class Create: Commands
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package com.r3.dbfailure.schemas
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import java.util.*
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Table
|
||||
|
||||
object DbFailureSchema
|
||||
|
||||
object DbFailureSchemaV1 : MappedSchema(
|
||||
schemaFamily = DbFailureSchema.javaClass,
|
||||
version = 1,
|
||||
mappedTypes = listOf(DbFailureSchemaV1.PersistentTestState::class.java)){
|
||||
override val migrationResource = "dbfailure.changelog-master"
|
||||
|
||||
@Entity
|
||||
@Table( name = "fail_test_states")
|
||||
class PersistentTestState(
|
||||
@Column( name = "participant")
|
||||
var participantName: String,
|
||||
|
||||
@Column( name = "random_value")
|
||||
var randomValue: String?,
|
||||
|
||||
@Column( name = "error_target")
|
||||
var errorTarget: Int,
|
||||
|
||||
@Column( name = "linear_id")
|
||||
var linearId: UUID
|
||||
) : PersistentState() {
|
||||
constructor() : this( "", "", 0, UUID.randomUUID())
|
||||
}
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" >
|
||||
<changeSet author="R3.Corda" id="test dbfailure error target">
|
||||
<addColumn tableName="fail_test_states">
|
||||
<column name="error_target" type="INT"></column>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -0,0 +1,20 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" >
|
||||
<changeSet author="R3.Corda" id="test dbfailure init">
|
||||
<createTable tableName="fail_test_states">
|
||||
<column name="output_index" type="INT">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="transaction_id" type="NVARCHAR(64)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="participant" type="NVARCHAR(255)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="random_value" type="NVARCHAR(255)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="linear_id" type="BINARY(255)"/>
|
||||
</createTable>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -0,0 +1,8 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<include file="migration/dbfailure.changelog-init.xml"/>
|
||||
<include file="migration/dbfailure.changelog-errortarget.xml"/>
|
||||
</databaseChangeLog>
|
12
testing/cordapps/dbfailure/dbfworkflows/build.gradle
Normal file
12
testing/cordapps/dbfailure/dbfworkflows/build.gradle
Normal file
@ -0,0 +1,12 @@
|
||||
apply plugin: 'kotlin'
|
||||
//apply plugin: 'net.corda.plugins.cordapp'
|
||||
//apply plugin: 'net.corda.plugins.quasar-utils'
|
||||
|
||||
dependencies {
|
||||
compile project(":core")
|
||||
compile project(":testing:cordapps:dbfailure:dbfcontracts")
|
||||
}
|
||||
|
||||
jar{
|
||||
baseName "testing-dbfailure-workflows"
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package com.r3.dbfailure.workflows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3.dbfailure.contracts.DbFailureContract
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
|
||||
object CreateStateFlow {
|
||||
|
||||
enum class ErrorTarget(val targetNumber: Int) {
|
||||
NoError(0),
|
||||
ServiceSqlSyntaxError(1),
|
||||
ServiceNullConstraintViolation(2),
|
||||
ServiceValidUpdate(3),
|
||||
ServiceReadState(4),
|
||||
ServiceCheckForState(5),
|
||||
ServiceThrowInvalidParameter(6),
|
||||
TxInvalidState(10),
|
||||
FlowSwallowErrors(100),
|
||||
ServiceSwallowErrors(1000)
|
||||
}
|
||||
|
||||
fun errorTargetsToNum(vararg targets: ErrorTarget): Int {
|
||||
return targets.map { it.targetNumber }.sum()
|
||||
}
|
||||
|
||||
private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber)
|
||||
|
||||
fun getServiceTarget(target: Int?): ErrorTarget {
|
||||
return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
}
|
||||
|
||||
fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget {
|
||||
return target?.let { targetMap.getValue(((it / 1000) % 10) * 1000) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
}
|
||||
|
||||
fun getTxTarget(target: Int?): ErrorTarget {
|
||||
return target?.let { targetMap.getValue(((it / 10) % 10) * 10) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
}
|
||||
|
||||
fun getFlowTarget(target: Int?): ErrorTarget {
|
||||
return target?.let { targetMap.getValue(((it / 100) % 10) * 100) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic<UniqueIdentifier>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): UniqueIdentifier {
|
||||
logger.info("Test flow: starting")
|
||||
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
||||
val txTarget = getTxTarget(errorTarget)
|
||||
logger.info("Test flow: The tx error target is $txTarget")
|
||||
val state = DbFailureContract.TestState(UniqueIdentifier(), ourIdentity, if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, errorTarget)
|
||||
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
||||
|
||||
logger.info("Test flow: tx builder")
|
||||
val txBuilder = TransactionBuilder(notary)
|
||||
.addOutputState(state)
|
||||
.addCommand(txCommand)
|
||||
|
||||
logger.info("Test flow: verify")
|
||||
txBuilder.verify(serviceHub)
|
||||
|
||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||
|
||||
try {
|
||||
logger.info("Test flow: recording transaction")
|
||||
serviceHub.recordTransactions(signedTx)
|
||||
} catch (t: Throwable) {
|
||||
if (getFlowTarget(errorTarget) == CreateStateFlow.ErrorTarget.FlowSwallowErrors) {
|
||||
logger.info("Test flow: Swallowing all exception! Muahahaha!", t)
|
||||
} else {
|
||||
logger.info("Test flow: caught exception - rethrowing")
|
||||
throw t
|
||||
}
|
||||
}
|
||||
logger.info("Test flow: returning")
|
||||
return state.linearId
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
package com.r3.dbfailure.workflows
|
||||
|
||||
import com.r3.dbfailure.contracts.DbFailureContract
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import java.security.InvalidParameterException
|
||||
|
||||
@CordaService
|
||||
class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
init {
|
||||
services.vaultService.rawUpdates.subscribe { (_, produced) ->
|
||||
produced.forEach {
|
||||
val contractState = it.state.data as? DbFailureContract.TestState
|
||||
try {
|
||||
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
|
||||
log.info("Fail with syntax error on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"BLAAA RANDOM_VALUE = NULL\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
|
||||
log.info("Fail with null constraint violation on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = NULL\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
|
||||
log.info("Update current statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceReadState -> {
|
||||
log.info("Read current state from db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"SELECT * FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
|
||||
log.info("Check for currently written state in the db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
val rs = statement.executeQuery(
|
||||
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
||||
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
|
||||
log.info("Throw InvalidParameterException")
|
||||
throw InvalidParameterException("Toys out of pram")
|
||||
}
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) == CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
||||
log.warn("Service not letting errors escape", t)
|
||||
} else {
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user