ENT-6142 Flows become dead due to uncaught exceptions (#4158)

If a flow fails outside of its normal error processing code path it will
end up in `FlowDefaultUncaughtExceptionHandler`.

This handler will put the flow into overnight observation if possible.
This is done in-memory and the database.

Even with this being done, the fiber itself has blown up and therefore
does not manage to get to `SMM.removeFlow` which is where
`SMM.decrementLiveFibers` is called. For example, a flow that errored
will hit this code eventually. This code is also hit when a flow is
suspended and a shutdown event is sent to it.

The `liveFibers` latch blocks the SMM from shutting down until all flows
have finished or processed shutdown events.

The changes described below resolve this problem.

Any flow that goes to the `FlowDefaultUncaughtExceptionHandler` will be
put marked as dead (`StateMachineState.isDead`). Highlighting that the
flow cannot continue to process events normally as it has broken out
of its event loop

Retrying and shutdown are done manually rather than injecting events
into the flow fiber's queue, because it can't execute its event loop.

Killing a dead flow executes an altered version of
`retryFlowFromSafePoint`. It does this so it can delete the checkpoint
and then continue using the checkpoint it just deleted to run the
kill flow transition on a new fiber.

If a killed flow reaches the `FlowDefaultUncaughtExceptionHandler` it
will be forcibly killed via `killFlowForcibly` which deletes the
checkpoint/or updates it to KILLED and then calls `removeFlow` to bypass
any event processing. This means that a flow that was dead and was killed
will be terminated manually if it reaches the handler again. The same is
true for flows that were not dead before but reached the handler after
being killed.

Also, `FlowCreator.createFlowFromCheckpoint` now retains the `isKilled`
state of the previous fiber's state.
This commit is contained in:
Dan Newton 2021-01-18 12:02:33 +00:00
parent 56df286410
commit c79ad972d0
10 changed files with 637 additions and 72 deletions

View File

@ -153,13 +153,15 @@ abstract class StateMachineErrorHandlingTest {
runnable: Int = 0,
failed: Int = 0,
completed: Int = 0,
hospitalized: Int = 0
hospitalized: Int = 0,
killed: Int = 0
) {
val counts = startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(runnable, counts.runnable, "There should be $runnable runnable checkpoints")
assertEquals(failed, counts.failed, "There should be $failed failed checkpoints")
assertEquals(completed, counts.completed, "There should be $completed completed checkpoints")
assertEquals(hospitalized, counts.hospitalized, "There should be $hospitalized hospitalized checkpoints")
assertEquals(killed, counts.killed, "There should be $killed killed checkpoints")
}
internal fun CordaRPCOps.assertNumberOfCheckpointsAllZero() = assertNumberOfCheckpoints()
@ -189,6 +191,7 @@ abstract class StateMachineErrorHandlingTest {
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
sleep(1.seconds)
throwException()
return "cant get here"
}
@ -219,7 +222,8 @@ abstract class StateMachineErrorHandlingTest {
runnable = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.RUNNABLE),
failed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.FAILED),
completed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.COMPLETED),
hospitalized = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.HOSPITALIZED)
hospitalized = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.HOSPITALIZED),
killed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.KILLED)
)
private fun getNumberOfCheckpointsWithStatus(status: Checkpoint.FlowStatus): Int {
@ -243,7 +247,8 @@ abstract class StateMachineErrorHandlingTest {
val runnable: Int = 0,
val failed: Int = 0,
val completed: Int = 0,
val hospitalized: Int = 0
val hospitalized: Int = 0,
val killed: Int = 0
)
// Internal use for testing only!!

View File

@ -1056,4 +1056,158 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
charlie.rpc.assertHospitalCounts(discharged = 3)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error during flow initialisation.
*
* The hospital has the flow's medical history updated with the new failure added to it. As the failure occurred before the original
* checkpoint was persisted, there is no checkpoint to update in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation that gets caught by default exception handler puts flow into in-memory overnight observation`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD openThreadLocalWormhole
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(
::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
)
}
Thread.sleep(10.seconds.toMillis())
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
// The flow failed during flow initialisation before committing the original checkpoint
// therefore there is no checkpoint to update the status of
alice.rpc.assertNumberOfCheckpoints(hospitalized = 0)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised.
*
* The hospital has the flow's medical history updated with the new failure added to it. The status of the checkpoint is also set to
* [Checkpoint.FlowStatus.HOSPITALIZED] to reflect this information in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler puts flow into overnight observation and reflected in database`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(30.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised. When updating the status of the flow to [Checkpoint.FlowStatus.HOSPITALIZED] an error occurs.
*
* The update is rescheduled and tried again. This is done separate from the fiber.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler retries the status update if it fails`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when updating status
INTERFACE ${CheckpointStorage::class.java.name}
METHOD updateStatus
AT ENTRY
IF readCounter("counter") < 2
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("should be a sql exception")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(50.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error after a flow has returned its
* result to the client.
*
* As the flow has already returned its result to the client, then the status of the flow has already been updated correctly and now the
* flow has experienced an unexpected error. There is no need to change the status as the flow has already finished.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow has returned result to client that gets caught by default exception handler does nothing except log`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints()
}
}
}

View File

@ -697,6 +697,58 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* On shutdown this flow will still terminate correctly and not prevent the node from shutting down.
*/
@Suppress("TooGenericExceptionCaught")
@Test(timeout = 300_000)
fun `a dead flow can be shutdown`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Log that state machine has ended
CLASS $stateMachineManagerClassName
METHOD stop
AT EXIT
IF true
DO traceln("State machine shutdown")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(50.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
try {
// This actually shuts down the node
alice.rpc.shutdown()
} catch(e: Exception) {
// Exception gets thrown due to shutdown
}
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("State machine shutdown", 1)
}
}
@StartableByRPC
class SleepCatchAndRethrowFlow : FlowLogic<String>() {
@Suspendable

View File

@ -5,6 +5,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
@ -140,6 +141,168 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*/
@Test(timeout = 300_000)
fun `a dead flow can be killed`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlow(::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
val killed = alice.rpc.killFlow(id)
assertTrue(killed)
Thread.sleep(20.seconds.toMillis())
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpointsAllZero()
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*
* Since the flow was started with a client id, record of the [KilledFlowException] should exists in the database.
*/
@Test(timeout = 300_000)
fun `a dead flow that was started with a client id can be killed`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlowWithClientId("my id", ::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
val killed = alice.rpc.killFlow(id)
assertTrue(killed)
Thread.sleep(20.seconds.toMillis())
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(killed = 1)
// Exception thrown by flow
assertFailsWith<KilledFlowException> {
alice.rpc.reattachFlowWithClientId<String>("my id")?.returnValue?.getOrThrow(20.seconds)
}
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*/
@Test(timeout = 300_000)
fun `a dead flow that is killed and fails again will forcibly kill itself`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception 2
CLASS ${TransitionExecutorImpl::class.java.name}
METHOD executeTransition
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die again")
ENDRULE
RULE Log that removeFlow is called
CLASS $stateMachineManagerClassName
METHOD removeFlow
AT EXIT
IF true
DO traceln("removeFlow called")
ENDRULE
RULE Log that killFlowForcibly is called
CLASS $stateMachineManagerClassName
METHOD killFlowForcibly
AT EXIT
IF true
DO traceln("killFlowForcibly called")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlow(::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
assertTrue(alice.rpc.killFlow(id))
Thread.sleep(20.seconds.toMillis())
alice.assertBytemanOutput("removeFlow called", 1)
alice.assertBytemanOutput("killFlowForcibly called", 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpointsAllZero()
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {

View File

@ -92,6 +92,7 @@ class FlowCreator(
lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true,
isKilled: Boolean = false,
progressTracker: ProgressTracker? = null
): Flow<*>? {
val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
@ -116,7 +117,8 @@ class FlowCreator(
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
numberOfCommits = checkpoint.checkpointState.numberOfCommits,
lock = lock
lock = lock,
isKilled = isKilled
)
injectOldProgressTracker(progressTracker, fiber.logic)
return Flow(fiber, resultFuture)
@ -248,7 +250,8 @@ class FlowCreator(
numberOfCommits: Int,
lock: Semaphore,
deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null
senderUUID: String? = null,
isKilled: Boolean = false
): StateMachineState {
return StateMachineState(
checkpoint = checkpoint,
@ -259,7 +262,8 @@ class FlowCreator(
isAnyCheckpointPersisted = anyCheckpointPersisted,
isStartIdempotent = false,
isRemoved = false,
isKilled = false,
isKilled = isKilled,
isDead = false,
flowLogic = fiber.logic,
senderUUID = senderUUID,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount,

View File

@ -10,7 +10,9 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
class FlowDefaultUncaughtExceptionHandler(
internal class FlowDefaultUncaughtExceptionHandler(
private val smm: StateMachineManagerInternal,
private val innerState: StateMachineInnerState,
private val flowHospital: StaffedFlowHospital,
private val checkpointStorage: CheckpointStorage,
private val database: CordaPersistence,
@ -31,7 +33,19 @@ class FlowDefaultUncaughtExceptionHandler(
)
} else {
fiber.logger.warn("Caught exception from flow $id", throwable)
setFlowToHospitalized(fiber, throwable)
if (fiber.isKilled) {
// If the flow was already killed and it has reached this exception handler then the flow must be killed forcibly to
// ensure it terminates. This could lead to sessions related to the flow not terminating as errors might not have been
// propagated to them.
smm.killFlowForcibly(id)
} else {
innerState.withLock {
setFlowToHospitalized(fiber, throwable)
// This flow has died and cannot continue to run as normal. Mark is as dead so that it can be handled directly by
// retry, kill and shutdown operations.
fiber.transientState = fiber.transientState.copy(isDead = true)
}
}
}
}
@ -52,9 +66,13 @@ class FlowDefaultUncaughtExceptionHandler(
@Suppress("TooGenericExceptionCaught")
private fun setFlowToHospitalizedRescheduleOnFailure(id: StateMachineRunId) {
try {
log.debug { "Updating the status of flow $id to hospitalized after uncaught exception" }
database.transaction { checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED) }
log.debug { "Updated the status of flow $id to hospitalized after uncaught exception" }
innerState.withLock {
if (flows[id]?.fiber?.transientState?.isDead == true) {
log.debug { "Updating the status of flow $id to hospitalized after uncaught exception" }
database.transaction { checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED) }
log.debug { "Updated the status of flow $id to hospitalized after uncaught exception" }
}
}
} catch (e: Exception) {
log.info("Failed to update the status of flow $id to hospitalized after uncaught exception, rescheduling", e)
scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, RESCHEDULE_DELAY, TimeUnit.SECONDS)

View File

@ -33,6 +33,7 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
@ -242,6 +243,8 @@ internal class SingleThreadedStateMachineManager(
private fun setFlowDefaultUncaughtExceptionHandler() {
Fiber.setDefaultUncaughtExceptionHandler(
FlowDefaultUncaughtExceptionHandler(
this,
innerState,
flowHospital,
checkpointStorage,
database,
@ -272,17 +275,40 @@ internal class SingleThreadedStateMachineManager(
if (stopping) throw IllegalStateException("Already stopping!")
stopping = true
for ((_, flow) in flows) {
flow.fiber.scheduleEvent(Event.SoftShutdown)
if (!flow.fiber.transientState.isDead) {
flow.fiber.scheduleEvent(Event.SoftShutdown)
}
}
}
// Account for any expected Fibers in a test scenario.
liveFibers.countDown(allowedUnsuspendedFiberCount)
liveFibers.await()
awaitShutdownOfFlows()
flowHospital.close()
scheduledFutureExecutor.shutdown()
scheduler.shutdown()
}
private fun awaitShutdownOfFlows() {
val shutdownLogger = StateMachineShutdownLogger(innerState)
var shutdown: Boolean
do {
// Manually shutdown dead flows as they can no longer process scheduled events.
// This needs to be repeated in this loop to prevent flows that die after shutdown is triggered from being forgotten.
// The mutex is not enough protection to stop race-conditions here, the removal of dead flows has to be repeated.
innerState.withMutex {
for ((id, flow) in flows) {
if (flow.fiber.transientState.isDead) {
removeFlow(id, FlowRemovalReason.SoftShutdown, flow.fiber.transientState)
}
}
}
shutdown = liveFibers.await(1.minutes.toMillis())
if (!shutdown) {
shutdownLogger.log()
}
} while (!shutdown)
}
/**
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
* calls to [allStateMachines]
@ -365,31 +391,29 @@ internal class SingleThreadedStateMachineManager(
override fun killFlow(id: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[id] }
val killFlowResult = flow?.let { killInMemoryFlow(it) } ?: killOutOfMemoryFlow(id)
val killFlowResult = flow?.let {
if (flow.fiber.transientState.isDead) {
// We cannot rely on fiber event processing in dead flows.
killInMemoryDeadFlow(it)
} else {
// Healthy flows need an event in case they they are suspended.
killInMemoryFlow(it)
}
} ?: killOutOfMemoryFlow(id)
return killFlowResult || flowHospital.dropSessionInit(id)
}
private fun killInMemoryFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!flow.fiber.transientState.isKilled) {
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true)
if (!transientState.isKilled) {
transientState = transientState.copy(isKilled = true)
logger.info("Killing flow $id known to this node.")
// The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely
// on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from
// the database, even if it is stuck in a infinite loop.
if (flow.fiber.transientState.isAnyCheckpointPersisted) {
database.transaction {
if (flow.fiber.clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, KilledFlowException(id))
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
}
updateCheckpointWhenKillingFlow(
id = id,
clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
)
unfinishedFibers.countDown()
scheduleEvent(Event.DoRemainingWork)
@ -401,6 +425,67 @@ internal class SingleThreadedStateMachineManager(
}
}
private fun killInMemoryDeadFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!transientState.isKilled) {
transientState = transientState.copy(isKilled = true)
logger.info("Killing dead flow $id known to this node.")
val (flowForRetry, _) = createNewFlowForRetry(transientState) ?: return false
updateCheckpointWhenKillingFlow(
id = id,
clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
)
unfinishedFibers.countDown()
innerState.withLock {
if (stopping) {
return true
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(transientState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flowForRetry != null) {
addAndStartFlow(id, flowForRetry)
}
}
true
} else {
logger.info("A repeated request to kill flow $id has been made, ignoring...")
false
}
}
}
private fun updateCheckpointWhenKillingFlow(
id: StateMachineRunId,
clientId: String?,
isAnyCheckpointPersisted: Boolean,
exception: KilledFlowException = KilledFlowException(id)
) {
// The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely
// on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from
// the database, even if it is stuck in a infinite loop or cannot be run (checkpoint cannot be deserialized from database).
if (isAnyCheckpointPersisted) {
database.transaction {
if (clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, exception)
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
}
}
private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean {
return database.transaction {
val checkpoint = checkpointStorage.getCheckpoint(id)
@ -423,6 +508,25 @@ internal class SingleThreadedStateMachineManager(
}
}
override fun killFlowForcibly(flowId: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[flowId] }
flow?.withFlowLock(VALID_KILL_FLOW_STATUSES) {
logger.info("Forcibly killing flow $flowId, errors will not be propagated to the flow's sessions")
updateCheckpointWhenKillingFlow(
id = flowId,
clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
)
removeFlow(
flowId,
FlowRemovalReason.ErrorFinish(listOf(FlowError(secureRandom.nextLong(), KilledFlowException(flowId)))),
transientState
)
return true
}
return false
}
private fun markAllFlowsAsPaused() {
return checkpointStorage.markAllPaused()
}
@ -540,48 +644,8 @@ internal class SingleThreadedStateMachineManager(
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.removeFlowException(flowId)
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
CheckpointLoadingStatus.Success(checkpoint)
}
val (flow, numberOfCommitsFromCheckpoint) = when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
flowId,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false,
progressTracker = currentState.flowLogic.progressTracker
) ?: return
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
else -> {
// Just flow initiation message
null to -1
}
}
val (flow, numberOfCommitsFromCheckpoint) = createNewFlowForRetry(currentState) ?: return
innerState.withLock {
if (stopping) {
@ -599,6 +663,53 @@ internal class SingleThreadedStateMachineManager(
}
}
private fun createNewFlowForRetry(currentState: StateMachineState): Pair<Flow<*>?, Int>? {
val id = currentState.flowLogic.runId
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(id) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.removeFlowException(id)
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
CheckpointLoadingStatus.Success(checkpoint)
}
return when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
id,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false,
isKilled = currentState.isKilled,
progressTracker = currentState.flowLogic.progressTracker
) ?: return null
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $id. Something is very wrong. The flow will not retry.")
null
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return null
else -> {
// Just flow initiation message
null to -1
}
}
}
/**
* Extract all the [ExternalEvent] from this flows event queue and queue them (in the correct order) in the PausedFlow.
* This differs from [extractAndScheduleEventsForRetry] which also extracts (and schedules) [Event.Pause]. This means that if there are

View File

@ -142,6 +142,7 @@ internal interface StateMachineManagerInternal {
fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId)
fun killFlowForcibly(flowId: StateMachineRunId): Boolean
}
/**

View File

@ -0,0 +1,54 @@
package net.corda.node.services.statemachine
import net.corda.core.utilities.contextLogger
internal class StateMachineShutdownLogger(private val innerState: StateMachineInnerState) {
private companion object {
val log = contextLogger()
}
fun log() {
innerState.withLock {
val message = StringBuilder("Shutdown of the state machine is blocked.\n")
val deadFlowMessage = StringBuilder()
if (flows.isNotEmpty()) {
message.append("The following live flows have not shutdown:\n")
for ((id, flow) in flows) {
val state = flow.fiber.transientState
val line = " - $id with properties " +
"[Status: ${state.checkpoint.status}, " +
"IO request: ${state.checkpoint.flowIoRequest ?: "Unstarted"}, " +
"Suspended: ${!state.isFlowResumed}, " +
"Last checkpoint timestamp: ${state.checkpoint.timestamp}, " +
"Killed: ${state.isKilled}]\n"
if (!state.isDead) {
message.append(line)
} else {
deadFlowMessage.append(line)
}
}
}
if (pausedFlows.isNotEmpty()) {
message.append("The following paused flows have not shutdown:\n")
for ((id, flow) in pausedFlows) {
message.append(
" - $id with properties " +
"[Status: ${flow.checkpoint.status}, " +
"IO request: ${flow.checkpoint.flowIoRequest ?: "Unstarted"}, " +
"Last checkpoint timestamp: ${flow.checkpoint.timestamp}, " +
"Resumable: ${flow.resumable}, " +
"Hospitalized: ${flow.hospitalized}]\n"
)
}
}
if (deadFlowMessage.isNotEmpty()) {
deadFlowMessage.insert(0, "The following dead (crashed) flows have not shutdown:\n")
message.append(deadFlowMessage)
}
message.append("Manual intervention maybe be required for state machine shutdown due to these flows.\n")
message.append("Continuing state machine shutdown loop...")
log.info(message.toString())
}
}
}

View File

@ -47,6 +47,8 @@ import java.util.concurrent.Semaphore
* work.
* @param isKilled true if the flow has been marked as killed. This is used to cause a flow to move to a killed flow transition no matter
* what event it is set to process next.
* @param isDead true if the flow has been marked as dead. This happens when a flow experiences an unexpected error and escapes its event loop
* which prevents it from processing events.
* @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
* @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when
* [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled.
@ -68,6 +70,7 @@ data class StateMachineState(
val isStartIdempotent: Boolean,
val isRemoved: Boolean,
val isKilled: Boolean,
val isDead: Boolean,
val senderUUID: String?,
val reloadCheckpointAfterSuspendCount: Int?,
var numberOfCommits: Int,