diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index 0727fd9949..2e24b09e66 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -2,11 +2,19 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient -import net.corda.core.flows.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party import net.corda.core.internal.list import net.corda.core.internal.readAllLines import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions @@ -23,6 +31,7 @@ import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.node.User import net.corda.testing.node.internal.InternalDriverDSL +import net.corda.testing.node.internal.cordappsForPackages import org.jboss.byteman.agent.submit.ScriptText import org.jboss.byteman.agent.submit.Submit import org.junit.Before @@ -138,6 +147,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -233,6 +245,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -328,6 +343,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -429,6 +447,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -534,6 +555,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -652,6 +676,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -758,12 +785,360 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) } } + /** + * Throws an exception when replaying a flow that has already successfully created its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the retry itself. + * + * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight + * observation. It is not ran again after this point + */ + @Test + fun `error during retry of a flow will force the flow into overnight observation`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Set flag when executing first suspend + CLASS ${TopLevelTransition::class.java.name} + METHOD suspendTransition + AT ENTRY + IF !flagged("suspend_flag") + DO flag("suspend_flag"); traceln("Setting suspend flag to true") + ENDRULE + + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Set flag when executing first commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && !flagged("commit_flag") + DO flag("commit_flag"); traceln("Setting commit flag to true") + ENDRULE + + RULE Throw exception on retry + CLASS ${MultiThreadedStateMachineManager::class.java.name} + METHOD addAndStartFlow + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + } + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when replaying a flow that has already successfully created its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the database commit that comes as part of retrying a flow. + * + * The flow is discharged and replayed from the hospital once. When the database commit failure occurs as part of retrying the + * flow, the starting and completion of the retried flow is affected. In other words, the error occurs as part of the replay, but the + * flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database + * commit occurs after this point. As the flow is already scheduled, the failure has not affect on it. + */ + @Test + fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Set flag when executing first suspend + CLASS ${TopLevelTransition::class.java.name} + METHOD suspendTransition + AT ENTRY + IF !flagged("suspend_flag") + DO flag("suspend_flag"); traceln("Setting suspend flag to true") + ENDRULE + + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Set flag when executing first commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && !flagged("commit_flag") + DO flag("commit_flag"); traceln("Setting commit flag to true") + ENDRULE + + RULE Throw exception on retry + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF flagged("suspend_flag") && flagged("commit_exception_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when replaying a flow that has not made its initial checkpoint. + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the retry itself. + * + * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight + * observation. It is not ran again after this point + * + * TODO Fix this scenario - it is currently hanging after putting the flow in for observation + * + */ + @Test + @Ignore + fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() { + driver( + DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + ) + ) { + val charlie = startNode( + NodeParameters( + providedName = CHARLIE_NAME, + rpcUsers = listOf(rpcUser), + additionalCordapps = cordappsForPackages("package name") + ) + ).getOrThrow() + val alice = + (this as InternalDriverDSL).startNode( + NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)), + bytemanPort = 12000 + ).getOrThrow() + + val submit = Submit("localhost", 12000) + + val rules = """ + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on retry + CLASS ${MultiThreadedStateMachineManager::class.java.name} + METHOD onExternalStartFlow + AT ENTRY + IF flagged("commit_exception_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submit.addScripts(listOf(ScriptText("Test script", rules))) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + Duration.of(30, ChronoUnit.SECONDS) + ) + } + + val output = alice.baseDirectory + .list() + .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") } + .readAllLines() + + // Check the stdout for the lines generated by byteman + assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(1, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + // 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow + assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + /** * Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing. * The exception is thrown 4 times. @@ -848,7 +1223,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { DO traceln("Byteman test - overnight observation") ENDRULE - RULE Increment observation counter + RULE Increment terminal counter CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name} METHOD consult AT READ TERMINAL @@ -877,6 +1252,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(4, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) @@ -982,6 +1360,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) assertEquals(0, charlieClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow @@ -1096,6 +1477,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) assertEquals(1, charlieClient.stateMachinesSnapshot().size) // 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow @@ -1210,6 +1594,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() { // Check the stdout for the lines generated by byteman assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) assertEquals(0, charlieClient.stateMachinesSnapshot().size) // 1 for GetNumberOfCheckpointsFlow @@ -1249,4 +1636,30 @@ class GetNumberOfCheckpointsFlow : FlowLogic() { } } } +} + +@StartableByRPC +class GetHospitalCountersFlow : FlowLogic() { + override fun call(): HospitalCounts = HospitalCounts( + serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter, + serviceHub.cordaService(HospitalCounter::class.java).observationCounter + ) +} + +@CordaSerializable +data class HospitalCounts(val discharge: Int, val observation: Int) + +@CordaService +class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() { + var observationCounter: Int = 0 + var dischargeCounter: Int = 0 + + init { + StaffedFlowHospital.onFlowDischarged.add { _, _ -> + ++dischargeCounter + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observationCounter + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e85a7347b3..7bd7ec0992 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -113,7 +113,7 @@ class SingleThreadedStateMachineManager( private var checkpointSerializationContext: CheckpointSerializationContext? = null private var actionExecutor: ActionExecutor? = null - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + override val flowHospital: StaffedFlowHospital = makeFlowHospital() private val transitionExecutor = makeTransitionExecutor() override val allStateMachines: List> @@ -817,6 +817,13 @@ class SingleThreadedStateMachineManager( return interceptors.fold(transitionExecutor) { executor, interceptor -> interceptor(executor) } } + private fun makeFlowHospital() : StaffedFlowHospital { + // If the node is running as a notary service, we don't retain errored session initiation requests in case of missing Cordapps + // to avoid memory leaks if the notary is under heavy load. + val isNotary = serviceHub.configuration.notary != null + return StaffedFlowHospital(flowMessaging, serviceHub.clock, ourSenderUUID) + } + private fun InnerState.removeFlowOrderly( flow: Flow, removalReason: FlowRemovalReason.OrderlyFinish, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 5b7f52e6ce..aec5b7ff54 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -17,17 +17,21 @@ import org.hibernate.exception.ConstraintViolationException import rx.subjects.PublishSubject import java.sql.SQLException import java.sql.SQLTransientConnectionException +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.* +import java.util.concurrent.ConcurrentHashMap import javax.persistence.PersistenceException import kotlin.math.pow /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. */ -class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) { - private companion object { +class StaffedFlowHospital(private val flowMessaging: FlowMessaging, + private val clock: Clock, + private val ourSenderUUID: String) { + companion object { private val log = contextLogger() private val staff = listOf( DeadlockNurse, @@ -42,11 +46,28 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val @VisibleForTesting val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List) -> Unit>() + @VisibleForTesting + val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List) -> Unit>() + @VisibleForTesting val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>() } + /** + * Represents the flows that have been admitted to the hospital for treatment. + * Flows should be removed from [flowsInHospital] when they have completed a successful transition. + */ + private val flowsInHospital = ConcurrentHashMap() + private val mutex = ThreadBox(object { + /** + * Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital, + * but their medical history will be retained. + * + * Flows should be removed from [flowPatients] when they have completed successfully. Upon successful completion, + * the medical history of a flow is no longer relevant as that flow has been completely removed from the + * statemachine. + */ val flowPatients = HashMap() val treatableSessionInits = HashMap() val recordsPublisher = PublishSubject.create() @@ -58,7 +79,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val * The node was unable to initiate the [InitialSessionMessage] from [sender]. */ fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) { - val time = Instant.now() + val time = clock.instant() val id = UUID.randomUUID() val outcome = if (error is SessionRejectException.UnknownClass) { // We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is @@ -111,10 +132,18 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val } /** - * The flow running in [flowFiber] has errored. + * Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being + * treated. */ - fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { - val time = Instant.now() + fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + // Only treat flows that are not already in the hospital + if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { + admit(flowFiber, currentState, errors) + } + } + + private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + val time = clock.instant() log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") onFlowAdmitted.forEach { it.invoke(flowFiber.id) } @@ -127,6 +156,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val Diagnosis.DISCHARGE -> { val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState) log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})") + onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) } Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff) } Diagnosis.OVERNIGHT_OBSERVATION -> { @@ -194,12 +224,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List) /** - * The flow has been removed from the state machine. + * Remove the flow's medical history from the hospital. */ - fun flowRemoved(flowId: StateMachineRunId) { + fun removeMedicalHistory(flowId: StateMachineRunId) { mutex.locked { flowPatients.remove(flowId) } } + /** + * Remove the flow from the hospital as it is not currently being treated. + */ + fun leave(id: StateMachineRunId) { + flowsInHospital.remove(id) + } + // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC /** Returns a stream of medical records as flows pass through the hospital. */ fun track(): DataFeed, MedicalRecord> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 812326f9b3..c806fc56ae 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -5,7 +5,6 @@ import net.corda.core.flows.StateMachineRunId import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult -import java.util.concurrent.ConcurrentHashMap /** * This interceptor notifies the passed in [flowHospital] in case a flow went through a clean->errored or a errored->clean @@ -21,12 +20,10 @@ class HospitalisingInterceptor( } private fun removeFlow(id: StateMachineRunId) { - hospitalisedFlows.remove(id) - flowHospital.flowRemoved(id) + flowHospital.leave(id) + flowHospital.removeMedicalHistory(id) } - private val hospitalisedFlows = ConcurrentHashMap() - @Suspendable override fun executeTransition( fiber: FlowFiber, @@ -36,19 +33,17 @@ class HospitalisingInterceptor( actionExecutor: ActionExecutor ): Pair { - // If the fiber's previous state was clean then remove it from the [hospitalisedFlows] map + // If the fiber's previous state was clean then remove it from the hospital // This is important for retrying a flow that has errored during a state machine transition - if(previousState.checkpoint.errorState is ErrorState.Clean) { - hospitalisedFlows.remove(fiber.id) + if (previousState.checkpoint.errorState is ErrorState.Clean) { + flowHospital.leave(fiber.id) } val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) { val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception } - if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) { - flowHospital.flowErrored(fiber, previousState, exceptionsToHandle) - } + flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle) } if (nextState.isRemoved) { removeFlow(fiber.id)