diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index c0e6ff1077..371177f26b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicInteger import javax.persistence.AttributeConverter +import javax.persistence.PersistenceException import javax.sql.DataSource /** @@ -98,7 +99,8 @@ class CordaPersistence( cacheFactory: NamedCacheFactory, attributeConverters: Collection> = emptySet(), customClassLoader: ClassLoader? = null, - val closeConnection: Boolean = true + val closeConnection: Boolean = true, + val errorHandler: (t: Throwable) -> Unit = {} ) : Closeable { companion object { private val log = contextLogger() @@ -189,10 +191,18 @@ class CordaPersistence( } fun createSession(): Connection { - // We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases. - _contextDatabase.set(this) - currentDBSession().flush() - return contextTransaction.connection + try { + // We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases. + _contextDatabase.set(this) + currentDBSession().flush() + return contextTransaction.connection + } catch (sqlException: SQLException) { + errorHandler(sqlException) + throw sqlException + } catch (persistenceException: PersistenceException) { + errorHandler(persistenceException) + throw persistenceException + } } /** @@ -220,10 +230,18 @@ class CordaPersistence( recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T { _contextDatabase.set(this) val outer = contextTransactionOrNull - return if (outer != null) { - outer.statement() - } else { - inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement) + try { + return if (outer != null) { + outer.statement() + } else { + inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement) + } + } catch (sqlException: SQLException) { + errorHandler(sqlException) + throw sqlException + } catch (persistenceException: PersistenceException) { + errorHandler(persistenceException) + throw persistenceException } } diff --git a/node/build.gradle b/node/build.gradle index 0e17f38358..18bf6174a5 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -243,6 +243,8 @@ dependencies { slowIntegrationTestCompile configurations.testCompile slowIntegrationTestRuntime configurations.runtime slowIntegrationTestRuntime configurations.testRuntime + + testCompile project(':testing:cordapps:dbfailure:dbfworkflows') } tasks.withType(JavaCompile) { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt index 1c5c89d776..5b7c26a7d1 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt @@ -140,7 +140,7 @@ class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic().unwrap { it } - throw ClientRelevantException("Something went wrong!", SQLException("Oops!")) + throw Exception("Something went wrong!", SQLException("Oops!")) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt new file mode 100644 index 0000000000..7aefce864d --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -0,0 +1,289 @@ +package net.corda.node.services.vault + +import com.r3.dbfailure.workflows.CreateStateFlow +import com.r3.dbfailure.workflows.CreateStateFlow.Initiator +import net.corda.core.CordaRuntimeException +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.node.services.statemachine.StateTransitionException +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.internal.IntegrationTest +import net.corda.testing.internal.IntegrationTestSchemas +import net.corda.testing.node.User +import net.corda.testing.node.internal.findCordapp +import org.junit.After +import org.junit.Assert +import org.junit.ClassRule +import org.junit.Test +import rx.exceptions.OnErrorNotImplementedException +import java.sql.SQLException +import javax.persistence.PersistenceException +import kotlin.test.assertFailsWith + +class VaultObserverExceptionTest : IntegrationTest() { + companion object { + @ClassRule + @JvmField + val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME) + + val log = contextLogger() + } + + @After + override fun tearDown() { + super.tearDown() + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() + StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() + StaffedFlowHospital.onFlowAdmitted.clear() + } + + /** + * Causing an SqlException via a syntax error in a vault observer causes the flow to hit the + * DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation + */ + @Test + fun unhandledSqlExceptionFromVaultObserverGetsHospitatlised() { + val testControlFuture = openFuture().toCompletableFuture() + + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped") + is SQLException -> { + testControlFuture.complete(true) + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + aliceNode.rpc.startFlow(::Initiator, "Syntax Error in Custom SQL", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)) + .returnValue.then { testControlFuture.complete(false) } + val foundExpectedException = testControlFuture.getOrThrow(30.seconds) + + Assert.assertTrue(foundExpectedException) + } + } + + /** + * Throwing a random (non-SQL releated) exception from a vault observer causes the flow to be + * aborted when unhandled in user code + */ + @Test + fun otherExceptionsFromVaultObserverBringFlowDown() { + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + assertFailsWith(CordaRuntimeException::class, "Toys out of pram") { + aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter)) + .returnValue.getOrThrow(30.seconds) + } + } + } + + /** + * A random exception from a VaultObserver will bring the Rx Observer down, but can be handled in the flow + * triggering the observer, and the flow will continue successfully (for some values of success) + */ + @Test + fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() { + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + .returnValue.getOrThrow(30.seconds) + + } + } + + /** + * If the state we are trying to persist triggers a ConstraintViolation, the flow hospital will retry the flow + * and finally give up on it + */ + @Test + fun constraintViolationOnCommitGetsRetriedAndFinallyFails() { + var counter = 0 + StaffedFlowHospital.onFlowAdmitted.add { + ++counter + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + assertFailsWith(StateTransitionException::class, "could not execute statement") { + aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState)) + .returnValue.getOrThrow() + } + } + Assert.assertTrue("Exception from service has not been to Hospital", counter > 0) + } + + /** + * If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in + * the vault observer will trigger a flush that throws. This will be retried, and finally fail. + */ + @Test + fun constraintViolationOnFlushGetsRetriedAndFinallyFails() { + var counter = 0 + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped") + is PersistenceException -> { + ++counter + log.info("Got a PersistentException in the flow hospital count = $counter") + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + assertFailsWith("ConstraintViolationException") { + aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState)) + .returnValue.getOrThrow(30.seconds) + } + } + Assert.assertTrue("Flow has not been to hospital", counter > 0) + } + + /** + * If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in + * the vault observer will trigger a flush that throws. This will be retried, and finally fail. + * Trying to catch and suppress that exception in the flow around the code triggering the vault observer + * does not change the outcome - the first exception in the service will bring the service down and will + * be caught by the flow, but the state machine will error the flow anyway as Corda code threw. + * On retry, the error will hit the commit, as the observer is dead, and fail as above. + */ + @Test + fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInFlow() { + var counter = 0 + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped") + is PersistenceException -> { + ++counter + log.info("Got a PersistentException in the flow hospital count = $counter") + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + val flowResult = flowHandle.returnValue + assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } + Assert.assertTrue("Flow has not been to hospital", counter > 0) + } + } + + /** + * If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in + * the vault observer will trigger a flush that throws. This will be retried, and finally fail. + * Trying to catch and suppress that exception inside the service does protect the service, but the new + * interceptor will fail the flow anyway. It will be retried and finally fail when the hospital gives + * up retrying. + */ + @Test + fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInService() { + var counter = 0 + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped") + is PersistenceException -> { + ++counter + log.info("Got a PersistentException in the flow hospital count = $counter") + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceValidUpdate, + CreateStateFlow.ErrorTarget.TxInvalidState, + CreateStateFlow.ErrorTarget.ServiceSwallowErrors)) + val flowResult = flowHandle.returnValue + assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } + Assert.assertTrue("Flow has not been to hospital", counter > 0) + } + } + + /** + * User code throwing a constraint violation in a raw vault observer will break the recordTransaction call, + * therefore handling it in flow code is no good, and the error will be passed to the flow hospital via the + * interceptor. + */ + @Test + fun constraintViolationInUserCodeInServiceCannotBeSuppressedInFlow() { + val testControlFuture = openFuture() + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + log.info("Flow has been kept for overnight observation") + testControlFuture.set(true) + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + val flowResult = flowHandle.returnValue + flowResult.then { + log.info("Flow has finished") + testControlFuture.set(false) + } + Assert.assertTrue("Flow has not been kept in hospital", testControlFuture.getOrThrow(30.seconds)) + } + } + + /** + * User code throwing a constraint violation and catching suppressing that within the observer code is fine + * and should not have any impact on the rest of the flow + */ + @Test + fun constraintViolationInUserCodeInServiceCanBeSuppressedInService() { + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation, + CreateStateFlow.ErrorTarget.ServiceSwallowErrors)) + val flowResult = flowHandle.returnValue + flowResult.getOrThrow(30.seconds) + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 055e5e40cd..44c29f3a70 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1141,7 +1141,9 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig, org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)) val attributeConverters = listOf(PublicKeyToTextConverter(), AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)) val jdbcUrl = hikariProperties.getProperty("dataSource.url", "") - return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader) + return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader, errorHandler = {t -> + FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t)) + }) } fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 98f2e1b447..5b7f52e6ce 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -38,6 +38,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val DatabaseEndocrinologist, TransitionErrorGeneralPractitioner ) + + @VisibleForTesting + val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List) -> Unit>() + + @VisibleForTesting + val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>() } private val mutex = ThreadBox(object { @@ -110,6 +116,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { val time = Instant.now() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") + onFlowAdmitted.forEach { it.invoke(flowFiber.id) } val (event, backOffForChronicCondition) = mutex.locked { val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() } @@ -125,6 +132,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val Diagnosis.OVERNIGHT_OBSERVATION -> { log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})") // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart + onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) } Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds) } Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 201f8d0010..812326f9b3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -44,7 +44,7 @@ class HospitalisingInterceptor( val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - if (nextState.checkpoint.errorState is ErrorState.Errored) { + if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) { val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception } if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) { flowHospital.flowErrored(fiber, previousState, exceptionsToHandle) diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 4e7e6915b7..a12da4be7f 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -23,6 +23,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.nodeapi.internal.persistence.* import org.hibernate.Session import rx.Observable +import rx.exceptions.OnErrorNotImplementedException import rx.subjects.PublishSubject import java.security.PublicKey import java.time.Clock @@ -390,7 +391,13 @@ class NodeVaultService( } } persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references) - updatesPublisher.onNext(vaultUpdate) + try { + updatesPublisher.onNext(vaultUpdate) + } catch (e: OnErrorNotImplementedException) { + log.warn("Caught an Rx.OnErrorNotImplementedException - caused by an exception in an RX observer that was unhandled - the observer has been unsubscribed! The underlying exception will be rethrown.", e) + // if the observer code threw, unwrap their exception from the RX wrapper + throw e.cause ?: e + } } } } diff --git a/settings.gradle b/settings.gradle index d9f6c9f20f..056f1e0f18 100644 --- a/settings.gradle +++ b/settings.gradle @@ -78,6 +78,8 @@ include 'samples:network-verifier:contracts' include 'samples:network-verifier:workflows' include 'serialization' include 'serialization-tests' +include 'testing:cordapps:dbfailure:dbfcontracts' +include 'testing:cordapps:dbfailure:dbfworkflows' // Common libraries - start include 'common-validation' diff --git a/testing/cordapps/dbfailure/dbfcontracts/build.gradle b/testing/cordapps/dbfailure/dbfcontracts/build.gradle new file mode 100644 index 0000000000..76dd8b8082 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/build.gradle @@ -0,0 +1,18 @@ +apply plugin: 'kotlin' +//apply plugin: 'net.corda.plugins.cordapp' +//apply plugin: 'net.corda.plugins.quasar-utils' + +repositories { + mavenLocal() + mavenCentral() + maven { url "$artifactory_contextUrl/corda-dependencies" } + maven { url "$artifactory_contextUrl/corda" } +} + +dependencies { + compile project(":core") +} + +jar{ + baseName "testing-dbfailure-contracts" +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt new file mode 100644 index 0000000000..543e734a9d --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt @@ -0,0 +1,45 @@ +package com.r3.dbfailure.contracts + +import com.r3.dbfailure.schemas.DbFailureSchemaV1 +import net.corda.core.contracts.CommandData +import net.corda.core.contracts.Contract +import net.corda.core.contracts.LinearState +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.Party +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.QueryableState +import net.corda.core.transactions.LedgerTransaction +import java.lang.IllegalArgumentException + +class DbFailureContract : Contract { + companion object { + @JvmStatic + val ID = "com.r3.dbfailure.contracts.DbFailureContract" + } + + class TestState(override val linearId: UniqueIdentifier, val particpant: Party, val randomValue: String?, val errorTarget: Int = 0) : LinearState, QueryableState { + + override val participants: List = listOf(particpant) + + override fun supportedSchemas(): Iterable = listOf(DbFailureSchemaV1) + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return if (schema is DbFailureSchemaV1){ + DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id) + } + else { + throw IllegalArgumentException("Unsupported schema $schema") + } + } + } + + override fun verify(tx: LedgerTransaction) { + // no op - don't care for now + } + + interface Commands : CommandData{ + class Create: Commands + } +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/schemas/DbFailureSchema.kt b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/schemas/DbFailureSchema.kt new file mode 100644 index 0000000000..e41ae4c6ff --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/schemas/DbFailureSchema.kt @@ -0,0 +1,35 @@ +package com.r3.dbfailure.schemas + +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import java.util.* +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Table + +object DbFailureSchema + +object DbFailureSchemaV1 : MappedSchema( + schemaFamily = DbFailureSchema.javaClass, + version = 1, + mappedTypes = listOf(DbFailureSchemaV1.PersistentTestState::class.java)){ + override val migrationResource = "dbfailure.changelog-master" + + @Entity + @Table( name = "fail_test_states") + class PersistentTestState( + @Column( name = "participant") + var participantName: String, + + @Column( name = "random_value") + var randomValue: String?, + + @Column( name = "error_target") + var errorTarget: Int, + + @Column( name = "linear_id") + var linearId: UUID + ) : PersistentState() { + constructor() : this( "", "", 0, UUID.randomUUID()) + } +} diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-errortarget.xml b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-errortarget.xml new file mode 100644 index 0000000000..ebc2450e11 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-errortarget.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-init.xml b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-init.xml new file mode 100644 index 0000000000..c65f013bb1 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-init.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-master.xml b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-master.xml new file mode 100644 index 0000000000..22b9ffda33 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/resources/migration/dbfailure.changelog-master.xml @@ -0,0 +1,8 @@ + + + + + + diff --git a/testing/cordapps/dbfailure/dbfworkflows/build.gradle b/testing/cordapps/dbfailure/dbfworkflows/build.gradle new file mode 100644 index 0000000000..571a3cb3a5 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfworkflows/build.gradle @@ -0,0 +1,12 @@ +apply plugin: 'kotlin' +//apply plugin: 'net.corda.plugins.cordapp' +//apply plugin: 'net.corda.plugins.quasar-utils' + +dependencies { + compile project(":core") + compile project(":testing:cordapps:dbfailure:dbfcontracts") +} + +jar{ + baseName "testing-dbfailure-workflows" +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt new file mode 100644 index 0000000000..6877154dd5 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt @@ -0,0 +1,87 @@ +package com.r3.dbfailure.workflows + +import co.paralleluniverse.fibers.Suspendable +import com.r3.dbfailure.contracts.DbFailureContract +import net.corda.core.contracts.Command +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.transactions.TransactionBuilder + +object CreateStateFlow { + + enum class ErrorTarget(val targetNumber: Int) { + NoError(0), + ServiceSqlSyntaxError(1), + ServiceNullConstraintViolation(2), + ServiceValidUpdate(3), + ServiceReadState(4), + ServiceCheckForState(5), + ServiceThrowInvalidParameter(6), + TxInvalidState(10), + FlowSwallowErrors(100), + ServiceSwallowErrors(1000) + } + + fun errorTargetsToNum(vararg targets: ErrorTarget): Int { + return targets.map { it.targetNumber }.sum() + } + + private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber) + + fun getServiceTarget(target: Int?): ErrorTarget { + return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError + } + + fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget { + return target?.let { targetMap.getValue(((it / 1000) % 10) * 1000) } ?: CreateStateFlow.ErrorTarget.NoError + } + + fun getTxTarget(target: Int?): ErrorTarget { + return target?.let { targetMap.getValue(((it / 10) % 10) * 10) } ?: CreateStateFlow.ErrorTarget.NoError + } + + fun getFlowTarget(target: Int?): ErrorTarget { + return target?.let { targetMap.getValue(((it / 100) % 10) * 100) } ?: CreateStateFlow.ErrorTarget.NoError + } + + @InitiatingFlow + @StartableByRPC + class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic() { + + @Suspendable + override fun call(): UniqueIdentifier { + logger.info("Test flow: starting") + val notary = serviceHub.networkMapCache.notaryIdentities[0] + val txTarget = getTxTarget(errorTarget) + logger.info("Test flow: The tx error target is $txTarget") + val state = DbFailureContract.TestState(UniqueIdentifier(), ourIdentity, if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, errorTarget) + val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey) + + logger.info("Test flow: tx builder") + val txBuilder = TransactionBuilder(notary) + .addOutputState(state) + .addCommand(txCommand) + + logger.info("Test flow: verify") + txBuilder.verify(serviceHub) + + val signedTx = serviceHub.signInitialTransaction(txBuilder) + + try { + logger.info("Test flow: recording transaction") + serviceHub.recordTransactions(signedTx) + } catch (t: Throwable) { + if (getFlowTarget(errorTarget) == CreateStateFlow.ErrorTarget.FlowSwallowErrors) { + logger.info("Test flow: Swallowing all exception! Muahahaha!", t) + } else { + logger.info("Test flow: caught exception - rethrowing") + throw t + } + } + logger.info("Test flow: returning") + return state.linearId + } + } +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt new file mode 100644 index 0000000000..dcc09c1eb9 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt @@ -0,0 +1,92 @@ +package com.r3.dbfailure.workflows + +import com.r3.dbfailure.contracts.DbFailureContract +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.contextLogger +import java.security.InvalidParameterException + +@CordaService +class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { + + companion object { + val log = contextLogger() + } + + init { + services.vaultService.rawUpdates.subscribe { (_, produced) -> + produced.forEach { + val contractState = it.state.data as? DbFailureContract.TestState + try { + when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) { + CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> { + log.info("Fail with syntax error on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "BLAAA RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> { + log.info("Fail with null constraint violation on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceValidUpdate -> { + log.info("Update current statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceReadState -> { + log.info("Read current state from db") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "SELECT * FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceCheckForState -> { + log.info("Check for currently written state in the db") + val session = services.jdbcSession() + val statement = session.createStatement() + val rs = statement.executeQuery( + "SELECT COUNT(*) FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0 + log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}") + } + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> { + log.info("Throw InvalidParameterException") + throw InvalidParameterException("Toys out of pram") + } + } + } catch (t: Throwable) { + if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) == CreateStateFlow.ErrorTarget.ServiceSwallowErrors) { + log.warn("Service not letting errors escape", t) + } else { + throw t + } + } + } + } + } +} \ No newline at end of file