mirror of
https://github.com/corda/corda.git
synced 2025-06-23 09:25:36 +00:00
Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber
This commit is contained in:
@ -49,6 +49,7 @@ class VaultObserverExceptionTest {
|
|||||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||||
StaffedFlowHospital.onFlowAdmitted.clear()
|
StaffedFlowHospital.onFlowAdmitted.clear()
|
||||||
DbListenerService.onError = null
|
DbListenerService.onError = null
|
||||||
|
DbListenerService.safeSubscription = true
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -84,6 +85,40 @@ class VaultObserverExceptionTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
|
||||||
|
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
fun unhandledSqlExceptionFromVaultObserverGetsHospitalised_UnsafeSubscription() {
|
||||||
|
DbListenerService.safeSubscription = false
|
||||||
|
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
||||||
|
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||||
|
when (it) {
|
||||||
|
is SQLException -> {
|
||||||
|
testControlFuture.complete(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
driver(DriverParameters(
|
||||||
|
startNodesInProcess = true,
|
||||||
|
cordappsForAllNodes = testCordapps())) {
|
||||||
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
|
aliceNode.rpc.startFlow(
|
||||||
|
::Initiator,
|
||||||
|
"Syntax Error in Custom SQL",
|
||||||
|
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
|
||||||
|
).returnValue.then { testControlFuture.complete(false) }
|
||||||
|
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
||||||
|
|
||||||
|
Assert.assertTrue(foundExpectedException)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* None exception thrown from a vault observer can be suppressible in the flow that triggered the observer
|
* None exception thrown from a vault observer can be suppressible in the flow that triggered the observer
|
||||||
* because the recording of transaction states failed. The flow will be hospitalized.
|
* because the recording of transaction states failed. The flow will be hospitalized.
|
||||||
|
@ -414,7 +414,7 @@ class NodeVaultService(
|
|||||||
HospitalizeFlowException(wrapped)
|
HospitalizeFlowException(wrapped)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} ?: HospitalizeFlowException(e)
|
} ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import net.corda.core.node.services.CordaService
|
|||||||
import net.corda.core.node.services.Vault
|
import net.corda.core.node.services.Vault
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import rx.observers.Subscribers
|
||||||
import java.security.InvalidParameterException
|
import java.security.InvalidParameterException
|
||||||
|
|
||||||
@CordaService
|
@CordaService
|
||||||
@ -20,6 +21,7 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
|
|
||||||
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
|
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
|
||||||
var throwUnrecoverableError = false
|
var throwUnrecoverableError = false
|
||||||
|
var safeSubscription = true
|
||||||
}
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
@ -119,7 +121,11 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
if (onError != null) {
|
if (onError != null) {
|
||||||
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
|
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
|
||||||
} else {
|
} else {
|
||||||
services.vaultService.rawUpdates.subscribe(onNext)
|
if (safeSubscription) {
|
||||||
|
services.vaultService.rawUpdates.subscribe(onNext)
|
||||||
|
} else {
|
||||||
|
services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user