CORDA-3196 warning at intervals when flows waiting in flow hospital (#2636)

* added the warning as a TimerTask at StaffedFlowHospital#delayedDischargeTimer

* moved the scheduling of the warning task at StaffedFlowHospital#init block. That way we ensure that the task will be scheduled only once at StaffedFlowHospital initialization.

* Corrected overnight observation warning task's logging message. Changed StaffedFlowHospital#delayedDischargeTimer to the more generic StaffedFlowHospital#hospitalJobTimer since it now schedules delayed discharges tasks as well the overnight observation warning task. Removed this from property reference

* switching to fun timerTask for the instantiation of anonymous TimerTask classes

* Correct condition to log patients who are currently in the hospital, whose last record in their medical records is Outcome.OVERNIGHT_OBSERVATION. Extended logging to include treatableSessionInits staying in the hospital

* Add not empty check for patientsUnderOvernightObservation. Correct strings.
This commit is contained in:
kyriathar 2019-10-11 10:25:30 +01:00 committed by LankyDan
parent ef01a99737
commit 877ce5587f

View File

@ -11,6 +11,7 @@ import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
@ -23,6 +24,7 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.concurrent.timerTask
import kotlin.math.pow
/**
@ -53,6 +55,28 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
}
private val hospitalJobTimer = Timer("FlowHospitalJobTimer", true)
init {
// Register a task to log (at intervals) flows that are kept in hospital for overnight observation.
hospitalJobTimer.scheduleAtFixedRate(timerTask {
mutex.locked {
if (flowsInHospital.isNotEmpty()) {
// Get patients whose last record in their medical records is Outcome.OVERNIGHT_OBSERVATION.
val patientsUnderOvernightObservation =
flowsInHospital.filter { flowPatients[it.key]?.records?.last()?.outcome == Outcome.OVERNIGHT_OBSERVATION }
if (patientsUnderOvernightObservation.isNotEmpty())
log.warn("There are ${patientsUnderOvernightObservation.count()} flows kept for overnight observation. " +
"Affected flow ids: ${patientsUnderOvernightObservation.map { it.key.uuid.toString() }.joinToString()}")
}
if (treatableSessionInits.isNotEmpty()) {
log.warn("There are ${treatableSessionInits.count()} erroneous session initiations kept for overnight observation. " +
"Erroneous session initiation ids: ${treatableSessionInits.map { it.key.toString() }.joinToString()}")
}
}
}, 1.minutes.toMillis(), 1.minutes.toMillis())
}
/**
* Represents the flows that have been admitted to the hospital for treatment.
* Flows should be removed from [flowsInHospital] when they have completed a successful transition.
@ -74,7 +98,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
})
private val secureRandom = newSecureRandom()
private val delayedDischargeTimer = Timer("FlowHospitalDelayedDischargeTimer", true)
/**
* The node was unable to initiate the [InitialSessionMessage] from [sender].
*/
@ -182,10 +205,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
if (backOffForChronicCondition.isZero) {
flowFiber.scheduleEvent(event)
} else {
delayedDischargeTimer.schedule(object : TimerTask() {
override fun run() {
flowFiber.scheduleEvent(event)
}
hospitalJobTimer.schedule(timerTask {
flowFiber.scheduleEvent(event)
}, backOffForChronicCondition.toMillis())
}
}