diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index c05543f0b1..1f8b1835f2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -14,12 +14,15 @@ import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.list import net.corda.core.internal.readAllLines import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap import net.corda.finance.DOLLARS import net.corda.finance.flows.CashIssueAndPaymentFlow @@ -53,6 +56,7 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertTrue class StatemachineErrorHandlingTest : IntegrationTest() { @@ -129,7 +133,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -208,7 +212,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -286,7 +290,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -370,7 +374,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -457,7 +461,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -559,7 +563,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -648,7 +652,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -745,7 +749,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -844,7 +848,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -929,7 +933,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1021,7 +1025,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1110,7 +1114,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(charlie) @@ -1206,7 +1210,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1304,7 +1308,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(charlie) @@ -1381,7 +1385,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() assertEquals(0, discharge) @@ -1452,7 +1456,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() assertEquals(0, discharge) @@ -1539,7 +1543,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val output = getBytemanOutput(charlie) @@ -1635,7 +1639,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) } val output = getBytemanOutput(charlie) @@ -1655,6 +1659,265 @@ class StatemachineErrorHandlingTest : IntegrationTest() { } } + /** + * Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital. + * + * The flow terminates and is not retried. + * + * No pass through the hospital is recorded. As the flow is marked as `isRemoved`. + */ + @Test + fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startTrackedFlow(::SleepFlow) + + var flowKilled = false + flow.progress.subscribe { + if (it == SleepFlow.STARTED.label) { + Thread.sleep(5000) + flowKilled = aliceClient.killFlow(flow.id) + } + } + + assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } + + val output = getBytemanOutput(alice) + + assertTrue(flowKilled) + // Check the stdout for the lines generated by byteman + assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + assertEquals(1, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Triggers `killFlow` during user application code. + * + * The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable] + * call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly. + * + * Although the call to kill the flow is made during user application code. It will not be removed / stop processing + * until the next suspension point is reached within the flow. + * + * The flow terminates and is not retried. + * + * No pass through the hospital is recorded. As the flow is marked as `isRemoved`. + */ + @Test + fun `flow killed during user code execution stops and removes the flow correctly`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startTrackedFlow(::ThreadSleepFlow) + + var flowKilled = false + flow.progress.subscribe { + if (it == ThreadSleepFlow.STARTED.label) { + Thread.sleep(5000) + flowKilled = aliceClient.killFlow(flow.id) + } + } + + assertFailsWith { flow.returnValue.getOrThrow(30.seconds) } + + val output = getBytemanOutput(alice) + + assertTrue(flowKilled) + // Check the stdout for the lines generated by byteman + assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + println(numberOfTerminalDiagnoses) + assertEquals(0, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and + * all that remains is its checkpoint in the database. + * + * The flow terminates and is not retried. + * + * Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are + * from the original flow that was put in for observation. + */ + @Test + fun `flow killed when it is in the flow hospital for observation is removed correctly`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + val charlie = createNode(CHARLIE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeSendInitial + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeSendInitial action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeSendInitial + AT ENTRY + IF readCounter("counter") < 4 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()) + + assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } + + aliceClient.killFlow(flow.id) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + assertEquals(0, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + private fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) { driver( DriverParameters( @@ -1724,6 +1987,44 @@ class SendAMessageResponder(private val session: FlowSession) : FlowLogic( } } +@StartableByRPC +class SleepFlow : FlowLogic() { + + object STARTED : ProgressTracker.Step("I am ready to die") + + override val progressTracker = ProgressTracker(STARTED) + + @Suspendable + override fun call() { + sleep(Duration.of(1, ChronoUnit.SECONDS)) + progressTracker.currentStep = STARTED + sleep(Duration.of(2, ChronoUnit.MINUTES)) + } +} + +@StartableByRPC +class ThreadSleepFlow : FlowLogic() { + + object STARTED : ProgressTracker.Step("I am ready to die") + + override val progressTracker = ProgressTracker(STARTED) + + @Suspendable + override fun call() { + sleep(Duration.of(1, ChronoUnit.SECONDS)) + progressTracker.currentStep = STARTED + logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep") + sleep() + logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep") + sleep(Duration.of(2, ChronoUnit.MINUTES)) + } + + // Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar + private fun sleep() { + Thread.sleep(20000) + } +} + @StartableByRPC class GetNumberOfCheckpointsFlow : FlowLogic() { override fun call(): Long { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index b02e718daf..023f541504 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -160,7 +160,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, */ fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { // Only treat flows that are not already in the hospital - if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { + if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { admit(flowFiber, currentState, errors) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index 6a95665097..85a9f57f60 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.contextDatabase import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.security.SecureRandom +import javax.persistence.OptimisticLockException /** * This [TransitionExecutor] runs the transition actions using the passed in [ActionExecutor] and manually dirties the @@ -52,6 +53,12 @@ class TransitionExecutorImpl( // Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork // to trigger error propagation log.info("Error while executing $action, with event $event, erroring state", exception) + if(previousState.isRemoved && exception is OptimisticLockException) { + log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " + + "Occurred while executing $action, with event $event", exception) + } else { + log.info("Error while executing $action, with event $event, erroring state", exception) + } val newState = previousState.copy( checkpoint = previousState.checkpoint.copy( errorState = previousState.checkpoint.errorState.addErrors( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt index ecef21157b..6b5b8132dd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt @@ -35,17 +35,21 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti actionExecutor: ActionExecutor ): Pair { val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) - val record = records.compute(fiber.id) { _, record -> - (record ?: ArrayList()).apply { add(transitionRecord) } - } - // Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition - // information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere. - if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) { - log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}") - for (error in nextState.checkpoint.errorState.errors) { - log.warn("Flow ${fiber.id} error", error.exception) + if (!previousState.isRemoved) { + val transitionRecord = + TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) + val record = records.compute(fiber.id) { _, record -> + (record ?: ArrayList()).apply { add(transitionRecord) } + } + + // Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition + // information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere. + if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) { + log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}") + for (error in nextState.checkpoint.errorState.errors) { + log.warn("Flow ${fiber.id} error", error.exception) + } } }