diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index 2e24b09e66..1fdd41759a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -1143,13 +1143,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() { * Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing. * The exception is thrown 4 times. * - * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the error - * propagates and the flow ends. + * This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation. * * Each time the flow retries, it begins from the previous checkpoint where it suspended before failing. */ @Test - fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and fail if error persists`() { + fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { driver( DriverParameters( startNodesInProcess = false, @@ -1192,7 +1191,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CLASS ${ActionExecutorImpl::class.java.name} METHOD executeCommitTransaction AT ENTRY - IF flagged("remove_checkpoint_flag") && readCounter("counter") < 5 + IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4 DO incrementCounter("counter"); clear("remove_checkpoint_flag"); traceln("Throwing exception"); @@ -1215,20 +1214,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() { DO traceln("Byteman test - discharging") ENDRULE - RULE Increment observation counter + RULE Increment not my speciality counter CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name} METHOD consult - AT READ OVERNIGHT_OBSERVATION + AT READ NOT_MY_SPECIALTY IF true - DO traceln("Byteman test - overnight observation") - ENDRULE - - RULE Increment terminal counter - CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name} - METHOD consult - AT READ TERMINAL - IF true - DO traceln("Byteman test - terminal") + DO traceln("Byteman test - not my speciality") ENDRULE """.trimIndent() @@ -1237,7 +1228,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - assertFailsWith { + assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( Duration.of(30, ChronoUnit.SECONDS) ) @@ -1249,15 +1240,14 @@ class StatemachineErrorHandlingTest : IntegrationTest() { .readAllLines() // Check the stdout for the lines generated by byteman - assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size) - assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) - assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size) + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - not my speciality") }.size) val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() - assertEquals(4, discharge) - assertEquals(0, observation) - assertEquals(0, aliceClient.stateMachinesSnapshot().size) - // 1 for GetNumberOfCheckpointsFlow - assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + // 1 for errored flow and 1 for GetNumberOfCheckpointsFlow + assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index 7aefce864d..7c5b3a68eb 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.vault import com.r3.dbfailure.workflows.CreateStateFlow import com.r3.dbfailure.workflows.CreateStateFlow.Initiator +import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum import net.corda.core.CordaRuntimeException import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.startFlow @@ -10,7 +11,6 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.Permissions import net.corda.node.services.statemachine.StaffedFlowHospital -import net.corda.node.services.statemachine.StateTransitionException import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.driver.DriverParameters @@ -25,6 +25,9 @@ import org.junit.ClassRule import org.junit.Test import rx.exceptions.OnErrorNotImplementedException import java.sql.SQLException +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeoutException import javax.persistence.PersistenceException import kotlin.test.assertFailsWith @@ -115,13 +118,21 @@ class VaultObserverExceptionTest : IntegrationTest() { /** * If the state we are trying to persist triggers a ConstraintViolation, the flow hospital will retry the flow - * and finally give up on it + * and keep it in for observation if errors persist. */ @Test - fun constraintViolationOnCommitGetsRetriedAndFinallyFails() { - var counter = 0 + fun constraintViolationOnCommitGetsRetriedAndThenGetsKeptForObservation() { + var admitted = 0 + var discharged = 0 + var observation = 0 StaffedFlowHospital.onFlowAdmitted.add { - ++counter + ++admitted + } + StaffedFlowHospital.onFlowDischarged.add { _, _ -> + ++discharged + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observation } driver(DriverParameters( @@ -129,20 +140,25 @@ class VaultObserverExceptionTest : IntegrationTest() { cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - assertFailsWith(StateTransitionException::class, "could not execute statement") { - aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState)) - .returnValue.getOrThrow() + assertFailsWith { + aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState)) + .returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) } } - Assert.assertTrue("Exception from service has not been to Hospital", counter > 0) + Assert.assertTrue("Exception from service has not been to Hospital", admitted > 0) + Assert.assertEquals(3, discharged) + Assert.assertEquals(1, observation) } /** * If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in - * the vault observer will trigger a flush that throws. This will be retried, and finally fail. + * the vault observer will trigger a flush that throws. This will be retried, and finally be kept in for observation. + * + * 4 discharges due to being handled once by [StaffedFlowHospital.DuplicateInsertSpecialist] and 3 times by + * [StaffedFlowHospital.TransitionErrorGeneralPractitioner] */ @Test - fun constraintViolationOnFlushGetsRetriedAndFinallyFails() { + fun constraintViolationOnFlushGetsRetriedAndThenGetsKeptForObservation() { var counter = 0 StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { when (it) { @@ -154,18 +170,30 @@ class VaultObserverExceptionTest : IntegrationTest() { } false } + var discharged = 0 + var observation = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> + ++discharged + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observation + } driver(DriverParameters( startNodesInProcess = true, cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - assertFailsWith("ConstraintViolationException") { - aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState)) + assertFailsWith("ConstraintViolationException") { + aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceValidUpdate, + CreateStateFlow.ErrorTarget.TxInvalidState)) .returnValue.getOrThrow(30.seconds) } } Assert.assertTrue("Flow has not been to hospital", counter > 0) + Assert.assertEquals(4, discharged) + Assert.assertEquals(1, observation) } /** @@ -197,7 +225,7 @@ class VaultObserverExceptionTest : IntegrationTest() { val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) val flowResult = flowHandle.returnValue - assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } + assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } Assert.assertTrue("Flow has not been to hospital", counter > 0) } } @@ -206,8 +234,7 @@ class VaultObserverExceptionTest : IntegrationTest() { * If we have a state causing a ConstraintViolation lined up for persistence, calling jdbConnection() in * the vault observer will trigger a flush that throws. This will be retried, and finally fail. * Trying to catch and suppress that exception inside the service does protect the service, but the new - * interceptor will fail the flow anyway. It will be retried and finally fail when the hospital gives - * up retrying. + * interceptor will fail the flow anyway. It will be retried and then be kept in for observation if errors persist. */ @Test fun constraintViolationOnFlushInVaultObserverCannotBeSuppressedInService() { @@ -233,7 +260,7 @@ class VaultObserverExceptionTest : IntegrationTest() { CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.ServiceSwallowErrors)) val flowResult = flowHandle.returnValue - assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } + assertFailsWith("ConstraintViolation") { flowResult.getOrThrow(30.seconds) } Assert.assertTrue("Flow has not been to hospital", counter > 0) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 42448e18f3..b02e718daf 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -356,12 +356,9 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, */ object DuplicateInsertSpecialist : Staff { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { - return if (newError.mentionsThrowable(ConstraintViolationException::class.java)) { - if(history.notDischargedForTheSameThingMoreThan(3, this, currentState)) { - Diagnosis.DISCHARGE - } else { - Diagnosis.TERMINAL - } + return if (newError.mentionsThrowable(ConstraintViolationException::class.java) + && history.notDischargedForTheSameThingMoreThan(2, this, currentState)) { + Diagnosis.DISCHARGE } else { Diagnosis.NOT_MY_SPECIALTY } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index a1177baaf3..d4a440361b 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -2,14 +2,17 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.core.concurrent.CordaFuture -import net.corda.core.flows.* +import net.corda.core.flows.FlowInfo +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.UntrustworthyData -import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.FinalityHandler import net.corda.node.services.messaging.Message @@ -17,11 +20,13 @@ import net.corda.node.services.persistence.DBTransactionStorage import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.testing.common.internal.eventually import net.corda.testing.core.TestIdentity -import net.corda.testing.node.internal.* +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.MessagingServiceSpy +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.enclosedCordapp +import net.corda.testing.node.internal.newContext import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatThrownBy import org.h2.util.Utils -import org.hibernate.exception.ConstraintViolationException import org.junit.After import org.junit.Assert.assertTrue import org.junit.Before @@ -69,14 +74,6 @@ class RetryFlowMockTest { assertEquals(2, RetryFlow.count) } - @Test - fun `Retry forever`() { - assertThatThrownBy { - nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow() - }.isInstanceOf(LimitedRetryCausingError::class.java) - assertEquals(5, RetryFlow.count) - } - @Test fun `Retry does not set senderUUID`() { val messagesSent = Collections.synchronizedList(mutableListOf()) @@ -188,7 +185,7 @@ class RetryFlowMockTest { } - class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint") + class LimitedRetryCausingError : IllegalStateException("I am going to live forever") class RetryCausingError : SQLException("deadlock") diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt index 14bd91bc3b..4d1dba5297 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt @@ -6,6 +6,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.queryBy import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.testing.core.DummyCommandData import net.corda.testing.core.singleIdentity @@ -21,7 +22,10 @@ import org.assertj.core.api.Assertions import org.junit.After import org.junit.Before import org.junit.Test +import java.time.Duration +import java.time.temporal.ChronoUnit import java.util.concurrent.ExecutionException +import java.util.concurrent.TimeoutException import kotlin.test.assertEquals class VaultFlowTest { @@ -56,8 +60,8 @@ class VaultFlowTest { fun `Unique column constraint failing causes states to not persist to vaults`() { StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException }) partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get() - Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy { - partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get() + Assertions.assertThatExceptionOfType(TimeoutException::class.java).isThrownBy { + partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) } assertEquals(1, partyA.transaction { partyA.services.vaultService.queryBy().states.size