mirror of
https://github.com/corda/corda.git
synced 2025-02-21 01:42:24 +00:00
CORDA-2813 Handle loss of database (#4962)
* CORDA-2813 Add DbException handler to terminate process when the DB goes away or gives up. Also add a helper to terminate the node and use it instead of calling Runtime.halt() in random places. * CORDA-2813 Address threading/blocking concerns raised during review
This commit is contained in:
parent
a012b75c31
commit
9668b8530a
@ -0,0 +1,28 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.node.utilities.errorAndTerminate
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.SQLException
|
||||
|
||||
/**
|
||||
* If a thread dies because it can't connect to the database, the node ends up in an inconsistent state.
|
||||
* Fail fast and hard.
|
||||
*/
|
||||
class DbExceptionHandler(private val parentHandler: Thread.UncaughtExceptionHandler? = null) : Thread.UncaughtExceptionHandler {
|
||||
override fun uncaughtException(t: Thread?, e: Throwable?) {
|
||||
|
||||
// the error is a database connection issue - pull the rug
|
||||
if (e is Error && e.cause is SQLException) {
|
||||
errorAndTerminate("Thread ${t!!.name} failed due to database connection error. This is unrecoverable, terminating node.", e)
|
||||
}
|
||||
|
||||
// replicate the default error handling from ThreadGroup for all other unhandled exceptions
|
||||
if (parentHandler != null) {
|
||||
parentHandler.uncaughtException(t, e)
|
||||
} else if (e !is ThreadDeath) {
|
||||
System.err.print("Exception in thread \"" + t!!.getName() + "\" ")
|
||||
e!!.printStackTrace(System.err)
|
||||
LoggerFactory.getLogger(this.javaClass.name).error("Exception in thread \"" + t.getName() + "\"", e)
|
||||
}
|
||||
}
|
||||
}
|
@ -420,6 +420,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
override fun start(): NodeInfo {
|
||||
registerDefaultExceptionHandler()
|
||||
initialiseSerialization()
|
||||
val nodeInfo: NodeInfo = super.start()
|
||||
nodeReadyFuture.thenMatch({
|
||||
@ -438,6 +439,14 @@ open class Node(configuration: NodeConfiguration,
|
||||
return nodeInfo
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a default exception handler for all threads that terminates the process if the database connection goes away and
|
||||
* cannot be recovered.
|
||||
*/
|
||||
private fun registerDefaultExceptionHandler() {
|
||||
Thread.setDefaultUncaughtExceptionHandler(DbExceptionHandler(Thread.getDefaultUncaughtExceptionHandler()))
|
||||
}
|
||||
|
||||
/**
|
||||
* A hook to allow configuration override of the JmxReporter being used.
|
||||
*/
|
||||
|
@ -25,6 +25,7 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.utilities.errorAndTerminate
|
||||
import net.corda.node.utilities.isEnabledTimedFlow
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
@ -34,7 +35,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.slf4j.MDC
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.reflect.KProperty1
|
||||
|
||||
@ -241,9 +241,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
Try.Success(result)
|
||||
} catch (t: Throwable) {
|
||||
if(t.isUnrecoverable()) {
|
||||
logger.error("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", t)
|
||||
Fiber.sleep(Duration.ofSeconds(10).toMillis()) // To allow async logger to flush.
|
||||
Runtime.getRuntime().halt(1)
|
||||
errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", t)
|
||||
}
|
||||
logger.info("Flow raised an error... sending it to flow hospital", t)
|
||||
Try.Failure<R>(t)
|
||||
|
@ -44,6 +44,7 @@ import net.corda.node.services.statemachine.interceptors.HospitalisingIntercepto
|
||||
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.errorAndTerminate
|
||||
import net.corda.node.utilities.injectOldProgressTracker
|
||||
import net.corda.node.utilities.isEnabledTimedFlow
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -51,7 +52,6 @@ import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.apache.logging.log4j.LogManager
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Integer.min
|
||||
@ -148,9 +148,7 @@ class SingleThreadedStateMachineManager(
|
||||
metrics.register("Flows.InFlight", Gauge<Int> { mutex.content.flows.size })
|
||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
||||
if (throwable is VirtualMachineError) {
|
||||
(fiber as FlowStateMachineImpl<*>).logger.error("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", throwable)
|
||||
LogManager.shutdown(true)
|
||||
Runtime.getRuntime().halt(1)
|
||||
errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", throwable)
|
||||
} else {
|
||||
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
|
||||
}
|
||||
|
@ -0,0 +1,21 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import net.corda.core.utilities.seconds
|
||||
import org.slf4j.LoggerFactory
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* Log error message and terminate the process. This might not clean up resources and could leave
|
||||
* the system in a messy state.
|
||||
*/
|
||||
@Synchronized
|
||||
fun errorAndTerminate(message: String, e: Throwable?) {
|
||||
thread {
|
||||
val log = LoggerFactory.getLogger("errorAndTerminate")
|
||||
log.error(message, e)
|
||||
}
|
||||
|
||||
// give the logger a chance to flush the error message before killing the node
|
||||
Thread.sleep(10.seconds.toMillis())
|
||||
Runtime.getRuntime().halt(1)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user