Added test, asserting FlowSafeSubscriber is alive and re-accessed upon flow retry

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-13 18:19:01 +00:00
parent bc79ce8058
commit 0fb49e9ed2
3 changed files with 46 additions and 12 deletions

View File

@ -736,7 +736,6 @@ class VaultObserverExceptionTest {
@Test
fun `Subscribing to NodeVaultService rawUpdates from a flow is not allowed` () {
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
@ -756,10 +755,38 @@ class VaultObserverExceptionTest {
}
}
//TODO add retry from checkpoint test
@Test
fun `Failing Observer wrapped with FlowSafeSubscriber will survive and be re-called upon flow retry`() {
var onNextCount = 0
var onErrorCount = 0
DbListenerService.onNextVisited = { _ -> onNextCount++ }
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
DbListenerService.onErrorVisited = { _ -> onErrorCount++ }
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
assertFailsWith<TimeoutException> {
aliceNode.rpc.startFlow(
ErrorHandling::CheckpointAfterErrorFlow,
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException,
CreateStateFlow.ErrorTarget.FlowSwallowErrors
)
).returnValue.getOrThrow(20.seconds)
}
assertEquals(4, onNextCount)
assertEquals(4, onErrorCount)
}
}
private fun NodeHandle.getNotarisedTransactionIds(): List<String> {

View File

@ -20,15 +20,16 @@ object CreateStateFlow {
// 1000s control exception handlling in the service/vault listener
enum class ErrorTarget(val targetNumber: Int) {
NoError(0),
ServiceSqlSyntaxError(1),
ServiceNullConstraintViolation(2),
ServiceValidUpdate(3),
ServiceReadState(4),
ServiceCheckForState(5),
ServiceThrowInvalidParameter(6),
ServiceThrowMotherOfAllExceptions(7),
ServiceThrowUnrecoverableError(8),
ServiceSqlSyntaxErrorOnConsumed(9),
ServiceSqlSyntaxError(10000),
ServiceNullConstraintViolation(20000),
ServiceValidUpdate(30000),
ServiceReadState(40000),
ServiceCheckForState(50000),
ServiceThrowInvalidParameter(60000),
ServiceThrowMotherOfAllExceptions(70000),
ServiceThrowUnrecoverableError(80000),
ServiceSqlSyntaxErrorOnConsumed(90000),
ServiceConstraintViolationException(1000000),
TxInvalidState(10),
FlowSwallowErrors(100),
ServiceSwallowErrors(1000)
@ -41,7 +42,7 @@ object CreateStateFlow {
private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber)
fun getServiceTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError
return target?.let { targetMap.getValue(((it/10000) % 1000)*10000) } ?: CreateStateFlow.ErrorTarget.NoError
}
fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget {

View File

@ -10,9 +10,11 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import org.hibernate.exception.ConstraintViolationException
import rx.observers.Subscribers
import java.lang.IllegalStateException
import java.security.InvalidParameterException
import java.sql.SQLException
@CordaService
class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
@ -114,6 +116,10 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
throw OutOfMemoryError("Unrecoverable error")
}
}
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException -> {
log.info("Throw ConstraintViolationException")
throw ConstraintViolationException("Dummy Hibernate Exception ", SQLException(), " Will cause flow retry!")
}
else -> {
// do nothing, everything else must be handled elsewhere
}