From c7b8af3fa6f94c5451924f5ba8c287e6472a39a7 Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Fri, 31 Jan 2020 03:56:08 +0000 Subject: [PATCH] Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber --- .../vault/VaultObserverExceptionTest.kt | 35 +++++++++++++++++++ .../node/services/vault/NodeVaultService.kt | 2 +- .../dbfailure/workflows/DbListenerService.kt | 8 ++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index f3557bf0ba..7691feb27c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -49,6 +49,7 @@ class VaultObserverExceptionTest { StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() StaffedFlowHospital.onFlowAdmitted.clear() DbListenerService.onError = null + DbListenerService.safeSubscription = true } /** @@ -84,6 +85,40 @@ class VaultObserverExceptionTest { } } + /** + * Causing an SqlException via a syntax error in a vault observer causes the flow to hit the + * DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe + */ + @Test + fun unhandledSqlExceptionFromVaultObserverGetsHospitalised_UnsafeSubscription() { + DbListenerService.safeSubscription = false + val testControlFuture = openFuture().toCompletableFuture() + + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is SQLException -> { + testControlFuture.complete(true) + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = testCordapps())) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + aliceNode.rpc.startFlow( + ::Initiator, + "Syntax Error in Custom SQL", + CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) + ).returnValue.then { testControlFuture.complete(false) } + val foundExpectedException = testControlFuture.getOrThrow(30.seconds) + + Assert.assertTrue(foundExpectedException) + } + } + /** * None exception thrown from a vault observer can be suppressible in the flow that triggered the observer * because the recording of transaction states failed. The flow will be hospitalized. diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index e60fd10cb4..d68d558cd1 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -414,7 +414,7 @@ class NodeVaultService( HospitalizeFlowException(wrapped) } } - } ?: HospitalizeFlowException(e) + } ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e))) } } } diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt index af14b50307..df8445c8e0 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt @@ -9,6 +9,7 @@ import net.corda.core.node.services.CordaService import net.corda.core.node.services.Vault import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger +import rx.observers.Subscribers import java.security.InvalidParameterException @CordaService @@ -20,6 +21,7 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { // make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm) var throwUnrecoverableError = false + var safeSubscription = true } init { @@ -119,7 +121,11 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { if (onError != null) { services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined } else { - services.vaultService.rawUpdates.subscribe(onNext) + if (safeSubscription) { + services.vaultService.rawUpdates.subscribe(onNext) + } else { + services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext)) + } } }