CORDA-3194 Failure during flow retry forces the flow into overnight observation (#2640)

When a flow fails to retry, it should be kept in for overnight observation and aborted.

In the future, it might be possible to retry flows again that failed during their retry, but for now keeping for observation and aborting is satisfactory.

* CORDA-3194 Remove hospitalised flows from `HospitalisingInterceptor`

Small refactor to remove some of the hospital logic out of the
`HospitalisingInterceptor` and into the `StaffedFlowHospital`.

Add some comments to help clarify the purpose of the two maps inside
of the hospital.

* CORDA-3194 When a flow fails to retry force it into observation

When a flow fails to retry, it should be kept in for overnight
observation and aborted.

In the future, it might be possible to retry flows again that failed
during their retry, but for now keeping for observation and aborting is
satisfactory.

* CORDA-3194 Test for database commit failure when retrying a flow

Failing during the database commit failure that occurs after the retry
flow action does not stop the flow from actually retrying. This test
confirms this functionality.

The retried flow gets scheduled as part of the retry action. The failure
in the commit action does not prevent this since it has already been
scheduled.
This commit is contained in:
Dan Newton 2019-10-09 10:53:00 +01:00 committed by LankyDan
parent 268d129838
commit ef01a99737
4 changed files with 474 additions and 22 deletions

View File

@ -2,11 +2,19 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
@ -23,6 +31,7 @@ import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.User
import net.corda.testing.node.internal.InternalDriverDSL
import net.corda.testing.node.internal.cordappsForPackages
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
@ -138,6 +147,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -233,6 +245,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -328,6 +343,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -429,6 +447,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -534,6 +555,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -652,6 +676,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -758,12 +785,360 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when replaying a flow that has already successfully created its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight
* observation. It is not ran again after this point
*/
@Test
fun `error during retry of a flow will force the flow into overnight observation`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser),
additionalCordapps = cordappsForPackages("package name")
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Throw exception on retry
CLASS ${MultiThreadedStateMachineManager::class.java.name}
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(1, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when replaying a flow that has already successfully created its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the database commit that comes as part of retrying a flow.
*
* The flow is discharged and replayed from the hospital once. When the database commit failure occurs as part of retrying the
* flow, the starting and completion of the retried flow is affected. In other words, the error occurs as part of the replay, but the
* flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database
* commit occurs after this point. As the flow is already scheduled, the failure has not affect on it.
*/
@Test
fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser),
additionalCordapps = cordappsForPackages("package name")
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Throw exception on retry
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(1, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when replaying a flow that has not made its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight
* observation. It is not ran again after this point
*
* TODO Fix this scenario - it is currently hanging after putting the flow in for observation
*
*/
@Test
@Ignore
fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser),
additionalCordapps = cordappsForPackages("package name")
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on retry
CLASS ${MultiThreadedStateMachineManager::class.java.name}
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(1, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(1, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
* The exception is thrown 4 times.
@ -848,7 +1223,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment observation counter
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ TERMINAL
@ -877,6 +1252,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size)
val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(4, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
@ -982,6 +1360,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
@ -1096,6 +1477,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow
@ -1210,6 +1594,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
@ -1249,4 +1636,30 @@ class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
}
}
}
}
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts = HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
)
}
@CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int)
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var observationCounter: Int = 0
var dischargeCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargeCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
}
}

View File

@ -113,7 +113,7 @@ class SingleThreadedStateMachineManager(
private var checkpointSerializationContext: CheckpointSerializationContext? = null
private var actionExecutor: ActionExecutor? = null
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
override val flowHospital: StaffedFlowHospital = makeFlowHospital()
private val transitionExecutor = makeTransitionExecutor()
override val allStateMachines: List<FlowLogic<*>>
@ -817,6 +817,13 @@ class SingleThreadedStateMachineManager(
return interceptors.fold(transitionExecutor) { executor, interceptor -> interceptor(executor) }
}
private fun makeFlowHospital() : StaffedFlowHospital {
// If the node is running as a notary service, we don't retain errored session initiation requests in case of missing Cordapps
// to avoid memory leaks if the notary is under heavy load.
val isNotary = serviceHub.configuration.notary != null
return StaffedFlowHospital(flowMessaging, serviceHub.clock, ourSenderUUID)
}
private fun InnerState.removeFlowOrderly(
flow: Flow,
removalReason: FlowRemovalReason.OrderlyFinish,

View File

@ -17,17 +17,21 @@ import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.math.pow
/**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
private companion object {
class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
private val clock: Clock,
private val ourSenderUUID: String) {
companion object {
private val log = contextLogger()
private val staff = listOf(
DeadlockNurse,
@ -42,11 +46,28 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
@VisibleForTesting
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
}
/**
* Represents the flows that have been admitted to the hospital for treatment.
* Flows should be removed from [flowsInHospital] when they have completed a successful transition.
*/
private val flowsInHospital = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
private val mutex = ThreadBox(object {
/**
* Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,
* but their medical history will be retained.
*
* Flows should be removed from [flowPatients] when they have completed successfully. Upon successful completion,
* the medical history of a flow is no longer relevant as that flow has been completely removed from the
* statemachine.
*/
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
val recordsPublisher = PublishSubject.create<MedicalRecord>()
@ -58,7 +79,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
* The node was unable to initiate the [InitialSessionMessage] from [sender].
*/
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
val time = Instant.now()
val time = clock.instant()
val id = UUID.randomUUID()
val outcome = if (error is SessionRejectException.UnknownClass) {
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
@ -111,10 +132,18 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
}
/**
* The flow running in [flowFiber] has errored.
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
val time = Instant.now()
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital
if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
admit(flowFiber, currentState, errors)
}
}
private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
val time = clock.instant()
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
onFlowAdmitted.forEach { it.invoke(flowFiber.id) }
@ -127,6 +156,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
Diagnosis.DISCHARGE -> {
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
@ -194,12 +224,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List<Staff>)
/**
* The flow has been removed from the state machine.
* Remove the flow's medical history from the hospital.
*/
fun flowRemoved(flowId: StateMachineRunId) {
fun removeMedicalHistory(flowId: StateMachineRunId) {
mutex.locked { flowPatients.remove(flowId) }
}
/**
* Remove the flow from the hospital as it is not currently being treated.
*/
fun leave(id: StateMachineRunId) {
flowsInHospital.remove(id)
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
/** Returns a stream of medical records as flows pass through the hospital. */
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {

View File

@ -5,7 +5,6 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.ConcurrentHashMap
/**
* This interceptor notifies the passed in [flowHospital] in case a flow went through a clean->errored or a errored->clean
@ -21,12 +20,10 @@ class HospitalisingInterceptor(
}
private fun removeFlow(id: StateMachineRunId) {
hospitalisedFlows.remove(id)
flowHospital.flowRemoved(id)
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)
}
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
@ -36,19 +33,17 @@ class HospitalisingInterceptor(
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the [hospitalisedFlows] map
// If the fiber's previous state was clean then remove it from the hospital
// This is important for retrying a flow that has errored during a state machine transition
if(previousState.checkpoint.errorState is ErrorState.Clean) {
hospitalisedFlows.remove(fiber.id)
if (previousState.checkpoint.errorState is ErrorState.Clean) {
flowHospital.leave(fiber.id)
}
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) {
flowHospital.flowErrored(fiber, previousState, exceptionsToHandle)
}
flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle)
}
if (nextState.isRemoved) {
removeFlow(fiber.id)