diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index d374c58ec6..278404bcb1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -8,12 +8,15 @@ import net.corda.core.internal.TimedFlow import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.messaging.DataFeed import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.seconds import net.corda.node.services.FinalityHandler import org.hibernate.exception.ConstraintViolationException import rx.subjects.PublishSubject import java.sql.SQLException +import java.time.Duration import java.time.Instant import java.util.* +import kotlin.math.pow /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. @@ -31,6 +34,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val }) private val secureRandom = newSecureRandom() + private val delayedDischargeTimer = Timer("FlowHospitalDelayedDischargeTimer", true) /** * The node was unable to initiate the [InitialSessionMessage] from [sender]. */ @@ -91,39 +95,60 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val val time = Instant.now() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") - val event = mutex.locked { + val (event, backOffForChronicCondition) = mutex.locked { val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() } val report = consultStaff(flowFiber, currentState, errors, medicalHistory) - val (outcome, event) = when (report.diagnosis) { + val (outcome, event, backOffForChronicCondition) = when (report.diagnosis) { Diagnosis.DISCHARGE -> { - log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}") - Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint) + val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState) + log.info("Flow ${flowFiber.id} error discharged from hospital (delay ${backOff.seconds}s) by ${report.by}") + Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff) } Diagnosis.OVERNIGHT_OBSERVATION -> { log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}") // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart - Pair(Outcome.OVERNIGHT_OBSERVATION, null) + Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds) } Diagnosis.NOT_MY_SPECIALTY -> { // None of the staff care for these errors so we let them propagate log.info("Flow ${flowFiber.id} error allowed to propagate") - Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation) + Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds) } } val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome) medicalHistory.records += record recordsPublisher.onNext(record) - event + Pair(event, backOffForChronicCondition) } if (event != null) { - flowFiber.scheduleEvent(event) + if (backOffForChronicCondition.isZero) { + flowFiber.scheduleEvent(event) + } else { + delayedDischargeTimer.schedule(object : TimerTask() { + override fun run() { + flowFiber.scheduleEvent(event) + } + }, backOffForChronicCondition.toMillis()) + } } } + private fun calculateBackOffForChronicCondition(report: ConsultationReport, medicalHistory: FlowMedicalHistory, currentState: StateMachineState): Duration { + return report.by.firstOrNull { it is Chronic }?.let { chronicStaff -> + return medicalHistory.timesDischargedForTheSameThing(chronicStaff, currentState).let { + if (it == 0) { + 0.seconds + } else { + maxOf(10, (10 + (Math.random()) * (10 * 1.5.pow(it)) / 2).toInt()).seconds + } + } + } ?: 0.seconds + } + private fun consultStaff(flowFiber: FlowFiber, currentState: StateMachineState, errors: List, @@ -165,8 +190,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val internal val records: MutableList = mutableListOf() fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean { + return timesDischargedForTheSameThing(by, currentState) <= max + } + + fun timesDischargedForTheSameThing(by: Staff, currentState: StateMachineState): Int { val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends - return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max + return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } } override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" @@ -214,14 +243,17 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val NOT_MY_SPECIALTY } + interface Staff { fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis } + interface Chronic + /** * SQL Deadlock detection. */ - object DeadlockNurse : Staff { + object DeadlockNurse : Staff, Chronic { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { return if (mentionsDeadlock(newError)) { Diagnosis.DISCHARGE