From 938828b52fad7257b9cf4b8079a5f12b67adcbfe Mon Sep 17 00:00:00 2001 From: Christian Sailer Date: Mon, 16 Sep 2019 11:59:58 +0100 Subject: [PATCH] CORDA-3195 Default behaviour of FlowHospital (WIP) (#2520) * Add GP to flow hospital, and start working on a list of things the GP knows to be incurable. * Only hospitalise SQL and Persistence Exceptions (let's see if that is enough?), also rename to DatabaseDentist. * Disabled hospitalisation of SQL exceptions in flow retry tests * Fix RPC exception handling test by not using PersistenceException * Ignore flaky integration test * Code review: Rename staff member and add testing annotation * Revert compiler.xml --- .../net/corda/node/flows/FlowRetryTest.kt | 8 ++++ .../services/rpc/RpcExceptionHandlingTest.kt | 2 +- .../statemachine/StaffedFlowHospital.kt | 41 +++++++++++++------ .../statemachine/RetryFlowMockTest.kt | 3 ++ .../node/services/vault/VaultFlowTest.kt | 3 ++ 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt index 25c0b78326..7349959d91 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt @@ -17,6 +17,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions import net.corda.node.services.statemachine.FlowTimeoutException +import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -25,6 +26,7 @@ import net.corda.testing.driver.driver import net.corda.testing.node.User import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.hibernate.exception.ConstraintViolationException +import org.junit.After import org.junit.Before import org.junit.Test import java.lang.management.ManagementFactory @@ -46,6 +48,12 @@ class FlowRetryTest { TransientConnectionFailureFlow.retryCount = -1 WrappedTransientConnectionFailureFlow.retryCount = -1 GeneralExternalFailureFlow.retryCount = -1 + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> true } + } + + @After + fun cleanUp() { + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() } @Test diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt index 84c87d23cf..1c5c89d776 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt @@ -140,7 +140,7 @@ class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic().unwrap { it } - throw GenericJDBCException("Something went wrong!", SQLException("Oops!")) + throw ClientRelevantException("Something went wrong!", SQLException("Oops!")) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 476bd0829f..ef0c2d150a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -7,10 +7,7 @@ import net.corda.core.flows.ReceiveTransactionFlow import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party -import net.corda.core.internal.DeclaredField -import net.corda.core.internal.ThreadBox -import net.corda.core.internal.TimedFlow -import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.* import net.corda.core.messaging.DataFeed import net.corda.core.utilities.contextLogger import net.corda.core.utilities.seconds @@ -22,6 +19,7 @@ import java.sql.SQLTransientConnectionException import java.time.Duration import java.time.Instant import java.util.* +import javax.persistence.PersistenceException import kotlin.math.pow /** @@ -31,11 +29,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val private companion object { private val log = contextLogger() private val staff = listOf( - DeadlockNurse, - DuplicateInsertSpecialist, - DoctorTimeout, - FinalityDoctor, - TransientConnectionCardiologist + DeadlockNurse, + DuplicateInsertSpecialist, + DoctorTimeout, + FinalityDoctor, + TransientConnectionCardiologist, + DatabaseEndocrinologist ) } @@ -259,7 +258,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val NOT_MY_SPECIALTY } - interface Staff { fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis } @@ -334,7 +332,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val } private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean { - return when(error) { + return when (error) { is UnexpectedFlowEndException -> { val peer = DeclaredField(UnexpectedFlowEndException::class.java, "peer", error).value peer != null @@ -361,7 +359,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val return strippedStacktrace.isNotEmpty() && strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! ) } - } /** @@ -384,6 +381,23 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available") } } + + /** + * Hospitalise any database (SQL) exception that wasn't handled otherwise, unless on the configurable whitelist + * Note that retry decisions from other specialists will not be affected as retries take precedence over hospitalisation. + */ + object DatabaseEndocrinologist : Staff { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (( newError is SQLException || newError is PersistenceException )&& !customConditions.any{it(newError)}) { + Diagnosis.OVERNIGHT_OBSERVATION + } else { + Diagnosis.NOT_MY_SPECIALTY + } + } + + @VisibleForTesting + val customConditions = mutableSetOf<(t: Throwable) -> Boolean>() + } } private fun Throwable?.mentionsThrowable(exceptionType: Class, errorMessage: String? = null): Boolean { @@ -396,4 +410,5 @@ private fun Throwable?.mentionsThrowable(exceptionType: Class true } return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage) -} \ No newline at end of file +} + diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 95e2db570b..a1177baaf3 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -49,6 +49,8 @@ class RetryFlowMockTest { SendAndRetryFlow.count = 0 RetryInsertFlow.count = 0 KeepSendingFlow.count.set(0) + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError } + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError } } private fun TestStartedNode.startFlow(logic: FlowLogic): CordaFuture { @@ -58,6 +60,7 @@ class RetryFlowMockTest { @After fun cleanUp() { mockNet.stopNodes() + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() } @Test diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt index 23359b1cfd..14bd91bc3b 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultFlowTest.kt @@ -6,6 +6,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.queryBy import net.corda.core.transactions.TransactionBuilder +import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.testing.core.DummyCommandData import net.corda.testing.core.singleIdentity import net.corda.testing.internal.vault.DUMMY_DEAL_PROGRAM_ID @@ -48,10 +49,12 @@ class VaultFlowTest { @After fun tearDown() { mockNetwork.stopNodes() + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() } @Test fun `Unique column constraint failing causes states to not persist to vaults`() { + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException }) partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get() Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy { partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()