CORDA-3194 Stop killed flows from re-entering hospital (#2664)

* CORDA-3194 Do not allow killed flows back into the hospital

This change has been made to prevent killed flows from being added back
to the hospital after being forcibly removed by `killFlow`. Not doing so,
could leave references to a flow inside of the hospital, which is not
the correct behaviour.

`killFlow` now sets a flow's `StatemachineState.isRemoved` to true.

This check is then used in `StaffedFlowHospital` and the
`DumpHistoryOnErrorInterceptor`.

* CORDA-3194 Log different message for transition error due to killed flow

When a flow is killed, its checkpoint is deleted. Currently, the
statemachine will still try a process the next event even if it has
been killed. This can lead to an error when trying to update the
deleted checkpoint. The exception thrown from this is logged out.

An if statement has been added to log a different message at debug level
if it is due to an update error for a killed flow. This is done to not
alarm node operators of the exception.
This commit is contained in:
Dan Newton 2019-10-17 16:09:27 +01:00 committed by LankyDan
parent 3b3dbd7352
commit f4e9b9d5d2
4 changed files with 341 additions and 29 deletions

View File

@ -14,12 +14,15 @@ import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueAndPaymentFlow
@ -53,6 +56,7 @@ import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class StatemachineErrorHandlingTest : IntegrationTest() {
@ -129,7 +133,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -208,7 +212,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -286,7 +290,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -370,7 +374,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -457,7 +461,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -559,7 +563,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -648,7 +652,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -745,7 +749,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -844,7 +848,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(alice)
@ -929,7 +933,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -1021,7 +1025,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -1110,7 +1114,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(charlie)
@ -1206,7 +1210,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
}
@ -1304,7 +1308,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
30.seconds
)
val output = getBytemanOutput(charlie)
@ -1381,7 +1385,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
@ -1452,7 +1456,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
@ -1539,7 +1543,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
).returnValue.getOrThrow(30.seconds)
val output = getBytemanOutput(charlie)
@ -1635,7 +1639,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
).returnValue.getOrThrow(30.seconds)
}
val output = getBytemanOutput(charlie)
@ -1655,6 +1659,265 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
}
}
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test
fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(1, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
println(numberOfTerminalDiagnoses)
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
aliceClient.killFlow(flow.id)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
private fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
driver(
DriverParameters(
@ -1724,6 +1987,44 @@ class SendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>(
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {

View File

@ -160,7 +160,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
*/
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital
if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
admit(flowFiber, currentState, errors)
}
}

View File

@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom
import javax.persistence.OptimisticLockException
/**
* This [TransitionExecutor] runs the transition actions using the passed in [ActionExecutor] and manually dirties the
@ -52,6 +53,12 @@ class TransitionExecutorImpl(
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
log.info("Error while executing $action, with event $event, erroring state", exception)
if(previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception)
} else {
log.info("Error while executing $action, with event $event, erroring state", exception)
}
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(

View File

@ -35,17 +35,21 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
val record = records.compute(fiber.id) { _, record ->
(record ?: ArrayList()).apply { add(transitionRecord) }
}
// Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition
// information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere.
if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) {
log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}")
for (error in nextState.checkpoint.errorState.errors) {
log.warn("Flow ${fiber.id} error", error.exception)
if (!previousState.isRemoved) {
val transitionRecord =
TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
val record = records.compute(fiber.id) { _, record ->
(record ?: ArrayList()).apply { add(transitionRecord) }
}
// Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition
// information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere.
if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) {
log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}")
for (error in nextState.checkpoint.errorState.errors) {
log.warn("Flow ${fiber.id} error", error.exception)
}
}
}