CORDA-3194 Relax duplicate insert flow hospital handling (#2643)

* CORDA-3194 Relax duplicate insert flow hospital handling

Revert a previous change to now make the duplicate insert staff member to
give a diagnosis of discharge or not my speciality (previously gave
terminal).

This is to prevent duplicate insert handling from overriding finality
flow error handling.
This commit is contained in:
Dan Newton 2019-10-11 14:29:45 +01:00 committed by LankyDan
parent 877ce5587f
commit a591c8e25b
5 changed files with 78 additions and 63 deletions

View File

@ -1143,13 +1143,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the error
* propagates and the flow ends.
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and fail if error persists`() {
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
driver(
DriverParameters(
startNodesInProcess = false,
@ -1192,7 +1191,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 5
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
@ -1215,20 +1214,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
RULE Increment not my speciality counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
AT READ NOT_MY_SPECIALTY
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
DO traceln("Byteman test - not my speciality")
ENDRULE
""".trimIndent()
@ -1237,7 +1228,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<StateTransitionException> {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
@ -1249,15 +1240,14 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - not my speciality") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(4, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for errored flow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.vault
import com.r3.dbfailure.workflows.CreateStateFlow
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
import net.corda.core.CordaRuntimeException
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlow
@ -10,7 +11,6 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StateTransitionException
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
@ -25,6 +25,9 @@ import org.junit.ClassRule
import org.junit.Test
import rx.exceptions.OnErrorNotImplementedException
import java.sql.SQLException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertFailsWith
@ -115,13 +118,21 @@ class VaultObserverExceptionTest : IntegrationTest() {
/**
* If the state we are trying to persist triggers a ConstraintViolation, the flow hospital will retry the flow
* and finally give up on it
* and keep it in for observation if errors persist.
*/
@Test
fun constraintViolationOnCommitGetsRetriedAndFinallyFails() {
var counter = 0
fun constraintViolationOnCommitGetsRetriedAndThenGetsKeptForObservation() {
var admitted = 0
var discharged = 0
var observation = 0
StaffedFlowHospital.onFlowAdmitted.add {
++counter
++admitted
}
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++discharged
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observation
}
driver(DriverParameters(
@ -129,20 +140,25 @@ class VaultObserverExceptionTest : IntegrationTest() {
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith(StateTransitionException::class, "could not execute statement") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow()
assertFailsWith<TimeoutException> {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
}
}
Assert.assertTrue("Exception from service has not been to Hospital", counter > 0)
Assert.assertTrue("Exception from service has not been to Hospital", admitted > 0)
Assert.assertEquals(3, discharged)
Assert.assertEquals(1, observation)
}
/**
* If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws. This will be retried, and finally fail.
* the vault observer will trigger a flush that throws. This will be retried, and finally be kept in for observation.
*
* 4 discharges due to being handled once by [StaffedFlowHospital.DuplicateInsertSpecialist] and 3 times by
* [StaffedFlowHospital.TransitionErrorGeneralPractitioner]
*/
@Test
fun constraintViolationOnFlushGetsRetriedAndFinallyFails() {
fun constraintViolationOnFlushGetsRetriedAndThenGetsKeptForObservation() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
@ -154,18 +170,30 @@ class VaultObserverExceptionTest : IntegrationTest() {
}
false
}
var discharged = 0
var observation = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++discharged
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observation
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<StateTransitionException>("ConstraintViolationException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState))
assertFailsWith<TimeoutException>("ConstraintViolationException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(30.seconds)
}
}
Assert.assertTrue("Flow has not been to hospital", counter > 0)
Assert.assertEquals(4, discharged)
Assert.assertEquals(1, observation)
}
/**
@ -197,7 +225,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors))
val flowResult = flowHandle.returnValue
assertFailsWith<StateTransitionException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
}
}
@ -206,8 +234,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
* If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws. This will be retried, and finally fail.
* Trying to catch and suppress that exception inside the service does protect the service, but the new
* interceptor will fail the flow anyway. It will be retried and finally fail when the hospital gives
* up retrying.
* interceptor will fail the flow anyway. It will be retried and then be kept in for observation if errors persist.
*/
@Test
fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInService() {
@ -233,7 +260,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
CreateStateFlow.ErrorTarget.TxInvalidState,
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
val flowResult = flowHandle.returnValue
assertFailsWith<CordaRuntimeException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
}
}

View File

@ -356,12 +356,9 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
*/
object DuplicateInsertSpecialist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (newError.mentionsThrowable(ConstraintViolationException::class.java)) {
if(history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.TERMINAL
}
return if (newError.mentionsThrowable(ConstraintViolationException::class.java)
&& history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.NOT_MY_SPECIALTY
}

View File

@ -2,14 +2,17 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
@ -17,11 +20,13 @@ import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.TestIdentity
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.newContext
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.h2.util.Utils
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
@ -69,14 +74,6 @@ class RetryFlowMockTest {
assertEquals(2, RetryFlow.count)
}
@Test
fun `Retry forever`() {
assertThatThrownBy {
nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow()
}.isInstanceOf(LimitedRetryCausingError::class.java)
assertEquals(5, RetryFlow.count)
}
@Test
fun `Retry does not set senderUUID`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
@ -188,7 +185,7 @@ class RetryFlowMockTest {
}
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
class LimitedRetryCausingError : IllegalStateException("I am going to live forever")
class RetryCausingError : SQLException("deadlock")

View File

@ -6,6 +6,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.DummyCommandData
import net.corda.testing.core.singleIdentity
@ -21,7 +22,10 @@ import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
class VaultFlowTest {
@ -56,8 +60,8 @@ class VaultFlowTest {
fun `Unique column constraint failing causes states to not persist to vaults`() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException })
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy {
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
Assertions.assertThatExceptionOfType(TimeoutException::class.java).isThrownBy {
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(1, partyA.transaction {
partyA.services.vaultService.queryBy<UniqueDummyLinearContract.State>().states.size