ENT-3710 Add TransientConnectionCardiologist to Flow Hospital (#5277)

This commit is contained in:
Dan Newton 2019-07-08 11:07:50 +01:00 committed by Shams Asari
parent 306df8e219
commit 6d3a6a3998
3 changed files with 212 additions and 16 deletions

View File

@ -44,25 +44,30 @@ Specifically, there are two main ways a flow is hospitalized:
2. Once started, if a flow experiences an error, the following failure scenarios are handled:
* SQLException mentioning a deadlock*:
if this happens, the flow will retry. If it retries more than once, a back off delay is applied to try and reduce contention.
* ``SQLException`` mentioning a **deadlock**:
If this happens, the flow will retry. If it retries more than once, a back off delay is applied to try and reduce contention.
Current policy means these types of failed flows will retry forever (unless explicitly killed). No intervention required.
* Database constraint violation:
this scenario may occur due to natural contention between racing flows as Corda delegates handling using the database's optimistic concurrency control.
* **Database constraint violation** (``ConstraintViolationException``):
This scenario may occur due to natural contention between racing flows as Corda delegates handling using the database's optimistic concurrency control.
As the likelihood of re-occurrence should be low, the flow will actually error and fail if it experiences this at the same point more than 3 times. No intervention required.
* Finality Flow handling - Corda 3.x (old style) ``FinalityFlow`` and Corda 4.x ``ReceiveFinalityFlow`` handling:
if on the receive side of the finality flow, any error will result in the flow being kept in for observation to allow the cause of the
* **Finality Flow handling** - Corda 3.x (old style) ``FinalityFlow`` and Corda 4.x ``ReceiveFinalityFlow`` handling:
If on the receive side of the finality flow, any error will result in the flow being kept in for observation to allow the cause of the
error to be rectified (so that the transaction isnt lost if, for example, associated contract JARs are missing).
Intervention is expected to be “rectify error, perhaps uploading attachment, and restart node” (or alternatively reject and call `killFlow`).
* `FlowTimeoutException`:
this is used internally by the notary client flow when talking to an HA notary. Its used to cause the client to try and talk to a different
* ``FlowTimeoutException``:
This is used internally by the notary client flow when talking to an HA notary. Its used to cause the client to try and talk to a different
member of the notary cluster if it doesn't hear back from the original member it sent the request to within a “reasonable” time.
The time is hard to document as the notary members, if actually alive, will inform the requester of the ETA of a response.
This can occur an infinite number of times. i.e. we never give up notarising. No intervention required.
* ``SQLTransientConnectionException``:
Database connection pooling errors are dealt with. If this exception occurs, the flow will retry. After retrying a number of times, the errored flow is kept in for observation.
.. note:: Flows that are kept in for observation are retried upon node restart.
Futures
-------

View File

@ -29,8 +29,13 @@ import org.junit.Before
import org.junit.Test
import java.lang.management.ManagementFactory
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
class FlowRetryTest {
@ -38,6 +43,9 @@ class FlowRetryTest {
fun resetCounters() {
InitiatorFlow.seen.clear()
InitiatedFlow.seen.clear()
TransientConnectionFailureFlow.retryCount = -1
WrappedTransientConnectionFailureFlow.retryCount = -1
GeneralExternalFailureFlow.retryCount = -1
}
@Test
@ -111,6 +119,63 @@ class FlowRetryTest {
}
}
}
@Test
fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}
@Test
fun `Specific exception still detected even if it is nested inside another exception`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}
@Test
fun `General external exceptions are not retried and propagate`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
assertFailsWith<CordaRuntimeException> {
it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(1, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}
}
fun isQuasarAgentSpecified(): Boolean {
@ -286,3 +351,93 @@ class ThrowingFlow() : FlowLogic<String>(), IdempotentFlow {
return "Result"
}
}
@StartableByRPC
@InitiatingFlow
class TransientConnectionFailureFlow(private val party: Party) : FlowLogic<Unit>() {
companion object {
// start negative due to where it is incremented
var retryCount = -1
}
@Suspendable
override fun call() {
initiateFlow(party).send("hello there")
// checkpoint will restart the flow after the send
retryCount += 1
throw SQLTransientConnectionException("Connection is not available")
}
}
@InitiatedBy(TransientConnectionFailureFlow::class)
class TransientConnectionFailureResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
@InitiatingFlow
class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogic<Unit>() {
companion object {
// start negative due to where it is incremented
var retryCount = -1
}
@Suspendable
override fun call() {
initiateFlow(party).send("hello there")
// checkpoint will restart the flow after the send
retryCount += 1
throw IllegalStateException("wrapped error message", IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")/*.fillInStackTrace()*/))
}
}
@InitiatedBy(WrappedTransientConnectionFailureFlow::class)
class WrappedTransientConnectionFailureResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
@InitiatingFlow
class GeneralExternalFailureFlow(private val party: Party) : FlowLogic<Unit>() {
companion object {
// start negative due to where it is incremented
var retryCount = -1
}
@Suspendable
override fun call() {
initiateFlow(party).send("hello there")
// checkpoint will restart the flow after the send
retryCount += 1
throw IllegalStateException("Some user general exception")
}
}
@InitiatedBy(GeneralExternalFailureFlow::class)
class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}

View File

@ -15,6 +15,7 @@ import net.corda.nodeapi.internal.cryptoservice.TimedCryptoServiceException
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.Instant
import java.util.*
@ -26,7 +27,14 @@ import kotlin.math.pow
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
private companion object {
private val log = contextLogger()
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, CryptoServiceTimeout, FinalityDoctor)
private val staff = listOf(
DeadlockNurse,
DuplicateInsertSpecialist,
DoctorTimeout,
CryptoServiceTimeout,
FinalityDoctor,
TransientConnectionCardiologist
)
}
private val mutex = ThreadBox(object {
@ -269,8 +277,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
}
private fun mentionsDeadlock(exception: Throwable?): Boolean {
return exception != null && (exception is SQLException && ((exception.message?.toLowerCase()?.contains("deadlock")
?: false)) || mentionsDeadlock(exception.cause))
return exception.mentionsThrowable(SQLException::class.java, "deadlock")
}
}
@ -279,16 +286,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
*/
object DuplicateInsertSpecialist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
return if (newError.mentionsThrowable(ConstraintViolationException::class.java) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
private fun mentionsConstraintViolation(exception: Throwable?): Boolean {
return exception != null && (exception is ConstraintViolationException || mentionsConstraintViolation(exception.cause))
}
}
/**
@ -337,4 +340,37 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
return throwable.stackTrace.any { it.className == ReceiveFinalityFlow::class.java.name }
}
}
/**
* [SQLTransientConnectionException] detection that arise from failing to connect the underlying database/datasource
*/
object TransientConnectionCardiologist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (mentionsTransientConnection(newError)) {
if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.OVERNIGHT_OBSERVATION
}
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
private fun mentionsTransientConnection(exception: Throwable?): Boolean {
return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available")
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
if (this == null) {
return false
}
val containsMessage = if (errorMessage != null) {
message?.toLowerCase()?.contains(errorMessage) ?: false
} else {
true
}
return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage)
}