From ef01a99737abea722a8764dc7b127bcc7ceca043 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Wed, 9 Oct 2019 10:53:00 +0100 Subject: [PATCH] CORDA-3194 Failure during flow retry forces the flow into overnight observation (#2640) When a flow fails to retry, it should be kept in for overnight observation and aborted. In the future, it might be possible to retry flows again that failed during their retry, but for now keeping for observation and aborting is satisfactory. * CORDA-3194 Remove hospitalised flows from `HospitalisingInterceptor` Small refactor to remove some of the hospital logic out of the `HospitalisingInterceptor` and into the `StaffedFlowHospital`. Add some comments to help clarify the purpose of the two maps inside of the hospital. * CORDA-3194 When a flow fails to retry force it into observation When a flow fails to retry, it should be kept in for overnight observation and aborted. In the future, it might be possible to retry flows again that failed during their retry, but for now keeping for observation and aborting is satisfactory. * CORDA-3194 Test for database commit failure when retrying a flow Failing during the database commit failure that occurs after the retry flow action does not stop the flow from actually retrying. This test confirms this functionality. The retried flow gets scheduled as part of the retry action. The failure in the commit action does not prevent this since it has already been scheduled. --- .../StatemachineErrorHandlingTest.kt | 417 +++++++++++++++++- .../SingleThreadedStateMachineManager.kt | 9 +- .../statemachine/StaffedFlowHospital.kt | 53 ++- .../interceptors/HospitalisingInterceptor.kt | 17 +- 4 files changed, 474 insertions(+), 22 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index 0727fd9949..2e24b09e66 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -2,11 +2,19 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient -import net.corda.core.flows.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party import net.corda.core.internal.list import net.corda.core.internal.readAllLines import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions @@ -23,6 +31,7 @@ import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.node.User import net.corda.testing.node.internal.InternalDriverDSL +import net.corda.testing.node.internal.cordappsForPackages import org.jboss.byteman.agent.submit.ScriptText import org.jboss.byteman.agent.submit.Submit import org.junit.Before @@ -138,6 +147,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -233,6 +245,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -328,6 +343,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -429,6 +447,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -534,6 +555,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -652,6 +676,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -758,12 +785,360 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) } } + /** + * Throws an exception when replaying a flow that has already successfully created its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the retry itself. + * + * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight + * observation. It is not ran again after this point + */ + @Test + fun `error during retry of a flow will force the flow into overnight observation`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Set flag when executing first suspend + CLASS ${TopLevelTransition::class.java.name} + METHOD suspendTransition + AT ENTRY + IF !flagged("suspend_flag") + DO flag("suspend_flag"); traceln("Setting suspend flag to true") + ENDRULE + + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Set flag when executing first commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && !flagged("commit_flag") + DO flag("commit_flag"); traceln("Setting commit flag to true") + ENDRULE + + RULE Throw exception on retry + CLASS ${MultiThreadedStateMachineManager::class.java.name} + METHOD addAndStartFlow + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + } + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when replaying a flow that has already successfully created its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the database commit that comes as part of retrying a flow. + * + * The flow is discharged and replayed from the hospital once. When the database commit failure occurs as part of retrying the + * flow, the starting and completion of the retried flow is affected. In other words, the error occurs as part of the replay, but the + * flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database + * commit occurs after this point. As the flow is already scheduled, the failure has not affect on it. + */ + @Test + fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Set flag when executing first suspend + CLASS ${TopLevelTransition::class.java.name} + METHOD suspendTransition + AT ENTRY + IF !flagged("suspend_flag") + DO flag("suspend_flag"); traceln("Setting suspend flag to true") + ENDRULE + + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Set flag when executing first commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && !flagged("commit_flag") + DO flag("commit_flag"); traceln("Setting commit flag to true") + ENDRULE + + RULE Throw exception on retry + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_exception_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when replaying a flow that has not made its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the retry itself. + * + * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight + * observation. It is not ran again after this point + * + * TODO Fix this scenario - it is currently hanging after putting the flow in for observation + * + */ + @Test + @Ignore + fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on retry + CLASS ${MultiThreadedStateMachineManager::class.java.name} + METHOD onExternalStartFlow + AT ENTRY + IF flagged("commit_exception_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + } + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + /** * Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing. * The exception is thrown 4 times. @@ -848,7 +1223,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { DO traceln("Byteman test - overnight observation") ENDRULE - RULE Increment observation counter + RULE Increment terminal counter CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name} METHOD consult AT READ TERMINAL @@ -877,6 +1252,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(4, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -982,6 +1360,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) assertEquals(0, charlieClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow @@ -1096,6 +1477,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) assertEquals(1, charlieClient.stateMachinesSnapshot().size) // 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow @@ -1210,6 +1594,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) assertEquals(0, charlieClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow @@ -1249,4 +1636,30 @@ class GetNumberOfCheckpointsFlow : FlowLogic() { } } } +} + +@StartableByRPC +class GetHospitalCountersFlow : FlowLogic() { + override fun call(): HospitalCounts = HospitalCounts( + serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter, + serviceHub.cordaService(HospitalCounter::class.java).observationCounter + ) +} + +@CordaSerializable +data class HospitalCounts(val discharge: Int, val observation: Int) + +@CordaService +class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() { + var observationCounter: Int = 0 + var dischargeCounter: Int = 0 + + init { + StaffedFlowHospital.onFlowDischarged.add { _, _ -> + ++dischargeCounter + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observationCounter + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e85a7347b3..7bd7ec0992 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -113,7 +113,7 @@ class SingleThreadedStateMachineManager( private var checkpointSerializationContext: CheckpointSerializationContext? = null private var actionExecutor: ActionExecutor? = null - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + override val flowHospital: StaffedFlowHospital = makeFlowHospital() private val transitionExecutor = makeTransitionExecutor() override val allStateMachines: List> @@ -817,6 +817,13 @@ class SingleThreadedStateMachineManager( return interceptors.fold(transitionExecutor) { executor, interceptor -> interceptor(executor) } } + private fun makeFlowHospital() : StaffedFlowHospital { + // If the node is running as a notary service, we don't retain errored session initiation requests in case of missing Cordapps + // to avoid memory leaks if the notary is under heavy load. + val isNotary = serviceHub.configuration.notary != null + return StaffedFlowHospital(flowMessaging, serviceHub.clock, ourSenderUUID) + } + private fun InnerState.removeFlowOrderly( flow: Flow, removalReason: FlowRemovalReason.OrderlyFinish, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 5b7f52e6ce..aec5b7ff54 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -17,17 +17,21 @@ import org.hibernate.exception.ConstraintViolationException import rx.subjects.PublishSubject import java.sql.SQLException import java.sql.SQLTransientConnectionException +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.* +import java.util.concurrent.ConcurrentHashMap import javax.persistence.PersistenceException import kotlin.math.pow /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. */ -class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) { - private companion object { +class StaffedFlowHospital(private val flowMessaging: FlowMessaging, + private val clock: Clock, + private val ourSenderUUID: String) { + companion object { private val log = contextLogger() private val staff = listOf( DeadlockNurse, @@ -42,11 +46,28 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val @VisibleForTesting val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List) -> Unit>() + @VisibleForTesting + val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List) -> Unit>() + @VisibleForTesting val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>() } + /** + * Represents the flows that have been admitted to the hospital for treatment. + * Flows should be removed from [flowsInHospital] when they have completed a successful transition. + */ + private val flowsInHospital = ConcurrentHashMap() + private val mutex = ThreadBox(object { + /** + * Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital, + * but their medical history will be retained. + * + * Flows should be removed from [flowPatients] when they have completed successfully. Upon successful completion, + * the medical history of a flow is no longer relevant as that flow has been completely removed from the + * statemachine. + */ val flowPatients = HashMap() val treatableSessionInits = HashMap() val recordsPublisher = PublishSubject.create() @@ -58,7 +79,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val * The node was unable to initiate the [InitialSessionMessage] from [sender]. */ fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) { - val time = Instant.now() + val time = clock.instant() val id = UUID.randomUUID() val outcome = if (error is SessionRejectException.UnknownClass) { // We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is @@ -111,10 +132,18 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val } /** - * The flow running in [flowFiber] has errored. + * Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being + * treated. */ - fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { - val time = Instant.now() + fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + // Only treat flows that are not already in the hospital + if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { + admit(flowFiber, currentState, errors) + } + } + + private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + val time = clock.instant() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") onFlowAdmitted.forEach { it.invoke(flowFiber.id) } @@ -127,6 +156,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val Diagnosis.DISCHARGE -> { val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState) log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})") + onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) } Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff) } Diagnosis.OVERNIGHT_OBSERVATION -> { @@ -194,12 +224,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List) /** - * The flow has been removed from the state machine. + * Remove the flow's medical history from the hospital. */ - fun flowRemoved(flowId: StateMachineRunId) { + fun removeMedicalHistory(flowId: StateMachineRunId) { mutex.locked { flowPatients.remove(flowId) } } + /** + * Remove the flow from the hospital as it is not currently being treated. + */ + fun leave(id: StateMachineRunId) { + flowsInHospital.remove(id) + } + // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC /** Returns a stream of medical records as flows pass through the hospital. */ fun track(): DataFeed, MedicalRecord> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 812326f9b3..c806fc56ae 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -5,7 +5,6 @@ import net.corda.core.flows.StateMachineRunId import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult -import java.util.concurrent.ConcurrentHashMap /** * This interceptor notifies the passed in [flowHospital] in case a flow went through a clean->errored or a errored->clean @@ -21,12 +20,10 @@ class HospitalisingInterceptor( } private fun removeFlow(id: StateMachineRunId) { - hospitalisedFlows.remove(id) - flowHospital.flowRemoved(id) + flowHospital.leave(id) + flowHospital.removeMedicalHistory(id) } - private val hospitalisedFlows = ConcurrentHashMap() - @Suspendable override fun executeTransition( fiber: FlowFiber, @@ -36,19 +33,17 @@ class HospitalisingInterceptor( actionExecutor: ActionExecutor ): Pair { - // If the fiber's previous state was clean then remove it from the [hospitalisedFlows] map + // If the fiber's previous state was clean then remove it from the hospital // This is important for retrying a flow that has errored during a state machine transition - if(previousState.checkpoint.errorState is ErrorState.Clean) { - hospitalisedFlows.remove(fiber.id) + if (previousState.checkpoint.errorState is ErrorState.Clean) { + flowHospital.leave(fiber.id) } val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) { val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception } - if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) { - flowHospital.flowErrored(fiber, previousState, exceptionsToHandle) - } + flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle) } if (nextState.isRemoved) { removeFlow(fiber.id)