Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber

This commit is contained in:
Kyriakos Tharrouniatis 2020-01-31 03:56:08 +00:00
parent c84a79a895
commit c7b8af3fa6
3 changed files with 43 additions and 2 deletions

View File

@ -49,6 +49,7 @@ class VaultObserverExceptionTest {
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowAdmitted.clear()
DbListenerService.onError = null
DbListenerService.safeSubscription = true
}
/**
@ -84,6 +85,40 @@ class VaultObserverExceptionTest {
}
}
/**
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe
*/
@Test
fun unhandledSqlExceptionFromVaultObserverGetsHospitalised_UnsafeSubscription() {
DbListenerService.safeSubscription = false
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is SQLException -> {
testControlFuture.complete(true)
}
}
false
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
Assert.assertTrue(foundExpectedException)
}
}
/**
* None exception thrown from a vault observer can be suppressible in the flow that triggered the observer
* because the recording of transaction states failed. The flow will be hospitalized.

View File

@ -414,7 +414,7 @@ class NodeVaultService(
HospitalizeFlowException(wrapped)
}
}
} ?: HospitalizeFlowException(e)
} ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e)))
}
}
}

View File

@ -9,6 +9,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import rx.observers.Subscribers
import java.security.InvalidParameterException
@CordaService
@ -20,6 +21,7 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
var throwUnrecoverableError = false
var safeSubscription = true
}
init {
@ -119,7 +121,11 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
if (onError != null) {
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
} else {
if (safeSubscription) {
services.vaultService.rawUpdates.subscribe(onNext)
} else {
services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext))
}
}
}