From 6d3a6a39984d7e80d9390d7a66d53b3e87a825ed Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Mon, 8 Jul 2019 11:07:50 +0100 Subject: [PATCH] ENT-3710 Add `TransientConnectionCardiologist` to Flow Hospital (#5277) --- docs/source/node-flow-hospital.rst | 21 ++- .../net/corda/node/flows/FlowRetryTest.kt | 155 ++++++++++++++++++ .../statemachine/StaffedFlowHospital.kt | 52 +++++- 3 files changed, 212 insertions(+), 16 deletions(-) diff --git a/docs/source/node-flow-hospital.rst b/docs/source/node-flow-hospital.rst index 02a86ce92f..e2338afdbf 100644 --- a/docs/source/node-flow-hospital.rst +++ b/docs/source/node-flow-hospital.rst @@ -44,25 +44,30 @@ Specifically, there are two main ways a flow is hospitalized: 2. Once started, if a flow experiences an error, the following failure scenarios are handled: - * SQLException mentioning a deadlock*: - if this happens, the flow will retry. If it retries more than once, a back off delay is applied to try and reduce contention. + * ``SQLException`` mentioning a **deadlock**: + If this happens, the flow will retry. If it retries more than once, a back off delay is applied to try and reduce contention. Current policy means these types of failed flows will retry forever (unless explicitly killed). No intervention required. - * Database constraint violation: - this scenario may occur due to natural contention between racing flows as Corda delegates handling using the database's optimistic concurrency control. + * **Database constraint violation** (``ConstraintViolationException``): + This scenario may occur due to natural contention between racing flows as Corda delegates handling using the database's optimistic concurrency control. As the likelihood of re-occurrence should be low, the flow will actually error and fail if it experiences this at the same point more than 3 times. No intervention required. - * Finality Flow handling - Corda 3.x (old style) ``FinalityFlow`` and Corda 4.x ``ReceiveFinalityFlow`` handling: - if on the receive side of the finality flow, any error will result in the flow being kept in for observation to allow the cause of the + * **Finality Flow handling** - Corda 3.x (old style) ``FinalityFlow`` and Corda 4.x ``ReceiveFinalityFlow`` handling: + If on the receive side of the finality flow, any error will result in the flow being kept in for observation to allow the cause of the error to be rectified (so that the transaction isn’t lost if, for example, associated contract JARs are missing). Intervention is expected to be “rectify error, perhaps uploading attachment, and restart node” (or alternatively reject and call `killFlow`). - * `FlowTimeoutException`: - this is used internally by the notary client flow when talking to an HA notary. It’s used to cause the client to try and talk to a different + * ``FlowTimeoutException``: + This is used internally by the notary client flow when talking to an HA notary. It’s used to cause the client to try and talk to a different member of the notary cluster if it doesn't hear back from the original member it sent the request to within a “reasonable” time. The time is hard to document as the notary members, if actually alive, will inform the requester of the ETA of a response. This can occur an infinite number of times. i.e. we never give up notarising. No intervention required. + * ``SQLTransientConnectionException``: + Database connection pooling errors are dealt with. If this exception occurs, the flow will retry. After retrying a number of times, the errored flow is kept in for observation. + +.. note:: Flows that are kept in for observation are retried upon node restart. + Futures ------- diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt index 2719379beb..fd1d37edeb 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt @@ -29,8 +29,13 @@ import org.junit.Before import org.junit.Test import java.lang.management.ManagementFactory import java.sql.SQLException +import java.sql.SQLTransientConnectionException +import java.time.Duration +import java.time.temporal.ChronoUnit import java.util.* +import java.util.concurrent.TimeoutException import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertNotNull class FlowRetryTest { @@ -38,6 +43,9 @@ class FlowRetryTest { fun resetCounters() { InitiatorFlow.seen.clear() InitiatedFlow.seen.clear() + TransientConnectionFailureFlow.retryCount = -1 + WrappedTransientConnectionFailureFlow.retryCount = -1 + GeneralExternalFailureFlow.retryCount = -1 } @Test @@ -111,6 +119,63 @@ class FlowRetryTest { } } } + + @Test + fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + assertFailsWith { + it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) + .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) + } + assertEquals(3, TransientConnectionFailureFlow.retryCount) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + } + + @Test + fun `Specific exception still detected even if it is nested inside another exception`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + assertFailsWith { + it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) + .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) + } + assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + } + + @Test + fun `General external exceptions are not retried and propagate`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + assertFailsWith { + it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow() + } + assertEquals(0, GeneralExternalFailureFlow.retryCount) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(1, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + } } fun isQuasarAgentSpecified(): Boolean { @@ -286,3 +351,93 @@ class ThrowingFlow() : FlowLogic(), IdempotentFlow { return "Result" } } + +@StartableByRPC +@InitiatingFlow +class TransientConnectionFailureFlow(private val party: Party) : FlowLogic() { + companion object { + // start negative due to where it is incremented + var retryCount = -1 + } + + @Suspendable + override fun call() { + initiateFlow(party).send("hello there") + // checkpoint will restart the flow after the send + retryCount += 1 + throw SQLTransientConnectionException("Connection is not available") + } +} + +@InitiatedBy(TransientConnectionFailureFlow::class) +class TransientConnectionFailureResponder(private val session: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + session.receive().unwrap { it } + } +} + +@StartableByRPC +@InitiatingFlow +class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogic() { + companion object { + // start negative due to where it is incremented + var retryCount = -1 + } + + @Suspendable + override fun call() { + initiateFlow(party).send("hello there") + // checkpoint will restart the flow after the send + retryCount += 1 + throw IllegalStateException("wrapped error message", IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")/*.fillInStackTrace()*/)) + } +} + +@InitiatedBy(WrappedTransientConnectionFailureFlow::class) +class WrappedTransientConnectionFailureResponder(private val session: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + session.receive().unwrap { it } + } +} + +@StartableByRPC +@InitiatingFlow +class GeneralExternalFailureFlow(private val party: Party) : FlowLogic() { + companion object { + // start negative due to where it is incremented + var retryCount = -1 + } + + @Suspendable + override fun call() { + initiateFlow(party).send("hello there") + // checkpoint will restart the flow after the send + retryCount += 1 + throw IllegalStateException("Some user general exception") + } +} + +@InitiatedBy(GeneralExternalFailureFlow::class) +class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + session.receive().unwrap { it } + } +} + +@StartableByRPC +class GetNumberOfCheckpointsFlow : FlowLogic() { + override fun call(): Long { + return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 21f280f2d3..da80316ae5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -15,6 +15,7 @@ import net.corda.nodeapi.internal.cryptoservice.TimedCryptoServiceException import org.hibernate.exception.ConstraintViolationException import rx.subjects.PublishSubject import java.sql.SQLException +import java.sql.SQLTransientConnectionException import java.time.Duration import java.time.Instant import java.util.* @@ -26,7 +27,14 @@ import kotlin.math.pow class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) { private companion object { private val log = contextLogger() - private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, CryptoServiceTimeout, FinalityDoctor) + private val staff = listOf( + DeadlockNurse, + DuplicateInsertSpecialist, + DoctorTimeout, + CryptoServiceTimeout, + FinalityDoctor, + TransientConnectionCardiologist + ) } private val mutex = ThreadBox(object { @@ -269,8 +277,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val } private fun mentionsDeadlock(exception: Throwable?): Boolean { - return exception != null && (exception is SQLException && ((exception.message?.toLowerCase()?.contains("deadlock") - ?: false)) || mentionsDeadlock(exception.cause)) + return exception.mentionsThrowable(SQLException::class.java, "deadlock") } } @@ -279,16 +286,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val */ object DuplicateInsertSpecialist : Staff { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { - return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) { + return if (newError.mentionsThrowable(ConstraintViolationException::class.java) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) { Diagnosis.DISCHARGE } else { Diagnosis.NOT_MY_SPECIALTY } } - - private fun mentionsConstraintViolation(exception: Throwable?): Boolean { - return exception != null && (exception is ConstraintViolationException || mentionsConstraintViolation(exception.cause)) - } } /** @@ -337,4 +340,37 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val return throwable.stackTrace.any { it.className == ReceiveFinalityFlow::class.java.name } } } + + /** + * [SQLTransientConnectionException] detection that arise from failing to connect the underlying database/datasource + */ + object TransientConnectionCardiologist : Staff { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { + return if (mentionsTransientConnection(newError)) { + if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) { + Diagnosis.DISCHARGE + } else { + Diagnosis.OVERNIGHT_OBSERVATION + } + } else { + Diagnosis.NOT_MY_SPECIALTY + } + } + + private fun mentionsTransientConnection(exception: Throwable?): Boolean { + return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available") + } + } } + +private fun Throwable?.mentionsThrowable(exceptionType: Class, errorMessage: String? = null): Boolean { + if (this == null) { + return false + } + val containsMessage = if (errorMessage != null) { + message?.toLowerCase()?.contains(errorMessage) ?: false + } else { + true + } + return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage) +} \ No newline at end of file