mirror of
https://github.com/corda/corda.git
synced 2024-12-22 06:17:55 +00:00
CORDA-2306 backoff on flow retries (#4383)
* Exponential back off for retries * Log back-off * Jitter back off timing * Set the minimum back-off to 10s * Refactor code to be only called when required. * Spelling * Make condition purely based on `Chronic` interface * Make timer a daemon thread
This commit is contained in:
parent
eb4a33e438
commit
32041930f4
@ -8,12 +8,15 @@ import net.corda.core.internal.TimedFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.FinalityHandler
|
||||
import org.hibernate.exception.ConstraintViolationException
|
||||
import rx.subjects.PublishSubject
|
||||
import java.sql.SQLException
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.math.pow
|
||||
|
||||
/**
|
||||
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
|
||||
@ -31,6 +34,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
})
|
||||
private val secureRandom = newSecureRandom()
|
||||
|
||||
private val delayedDischargeTimer = Timer("FlowHospitalDelayedDischargeTimer", true)
|
||||
/**
|
||||
* The node was unable to initiate the [InitialSessionMessage] from [sender].
|
||||
*/
|
||||
@ -91,39 +95,60 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
val time = Instant.now()
|
||||
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
||||
|
||||
val event = mutex.locked {
|
||||
val (event, backOffForChronicCondition) = mutex.locked {
|
||||
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
|
||||
|
||||
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
|
||||
|
||||
val (outcome, event) = when (report.diagnosis) {
|
||||
val (outcome, event, backOffForChronicCondition) = when (report.diagnosis) {
|
||||
Diagnosis.DISCHARGE -> {
|
||||
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
|
||||
Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint)
|
||||
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
|
||||
log.info("Flow ${flowFiber.id} error discharged from hospital (delay ${backOff.seconds}s) by ${report.by}")
|
||||
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
|
||||
}
|
||||
Diagnosis.OVERNIGHT_OBSERVATION -> {
|
||||
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
|
||||
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
|
||||
Pair(Outcome.OVERNIGHT_OBSERVATION, null)
|
||||
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
|
||||
}
|
||||
Diagnosis.NOT_MY_SPECIALTY -> {
|
||||
// None of the staff care for these errors so we let them propagate
|
||||
log.info("Flow ${flowFiber.id} error allowed to propagate")
|
||||
Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation)
|
||||
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
|
||||
medicalHistory.records += record
|
||||
recordsPublisher.onNext(record)
|
||||
event
|
||||
Pair(event, backOffForChronicCondition)
|
||||
}
|
||||
|
||||
if (event != null) {
|
||||
flowFiber.scheduleEvent(event)
|
||||
if (backOffForChronicCondition.isZero) {
|
||||
flowFiber.scheduleEvent(event)
|
||||
} else {
|
||||
delayedDischargeTimer.schedule(object : TimerTask() {
|
||||
override fun run() {
|
||||
flowFiber.scheduleEvent(event)
|
||||
}
|
||||
}, backOffForChronicCondition.toMillis())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun calculateBackOffForChronicCondition(report: ConsultationReport, medicalHistory: FlowMedicalHistory, currentState: StateMachineState): Duration {
|
||||
return report.by.firstOrNull { it is Chronic }?.let { chronicStaff ->
|
||||
return medicalHistory.timesDischargedForTheSameThing(chronicStaff, currentState).let {
|
||||
if (it == 0) {
|
||||
0.seconds
|
||||
} else {
|
||||
maxOf(10, (10 + (Math.random()) * (10 * 1.5.pow(it)) / 2).toInt()).seconds
|
||||
}
|
||||
}
|
||||
} ?: 0.seconds
|
||||
}
|
||||
|
||||
private fun consultStaff(flowFiber: FlowFiber,
|
||||
currentState: StateMachineState,
|
||||
errors: List<Throwable>,
|
||||
@ -165,8 +190,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
internal val records: MutableList<MedicalRecord.Flow> = mutableListOf()
|
||||
|
||||
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean {
|
||||
return timesDischargedForTheSameThing(by, currentState) <= max
|
||||
}
|
||||
|
||||
fun timesDischargedForTheSameThing(by: Staff, currentState: StateMachineState): Int {
|
||||
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
|
||||
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
|
||||
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount }
|
||||
}
|
||||
|
||||
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
|
||||
@ -214,14 +243,17 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
||||
NOT_MY_SPECIALTY
|
||||
}
|
||||
|
||||
|
||||
interface Staff {
|
||||
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
|
||||
}
|
||||
|
||||
interface Chronic
|
||||
|
||||
/**
|
||||
* SQL Deadlock detection.
|
||||
*/
|
||||
object DeadlockNurse : Staff {
|
||||
object DeadlockNurse : Staff, Chronic {
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (mentionsDeadlock(newError)) {
|
||||
Diagnosis.DISCHARGE
|
||||
|
Loading…
Reference in New Issue
Block a user