mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
CORDA-3720 Extract locking of InnerState out of SMM (#6289)
The state machines state is held within `InnerState` which lived inside the SMM. `InnerState` has been extracted out of the SMM to allow the SMM to be refactored in the future. Smaller classes can now be made that focus on a single goal as the locking of the state can be accessed from external classes. To achieve this, pass the `InnerState` into the class and request a lock if needed. The locking of `InnerState` has been made a property of the `InnerState` itself. It has a `lock` field that allows locks to be taken out when needed. An inline `withLock` function has been added to tidy up the code and not harm performance. Some classes have been made internal to prevent invalid usage of purely node internal classes. As part of this change, flow timeouts have been extracted out into `FlowTimeoutScheduler`.
This commit is contained in:
parent
d720c86fc7
commit
796e92b512
@ -163,4 +163,12 @@ abstract class StatemachineErrorHandlingTest {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal val actionExecutorClassName: String by lazy {
|
||||
Class.forName("net.corda.node.services.statemachine.ActionExecutorImpl").name
|
||||
}
|
||||
|
||||
internal val stateMachineManagerClassName: String by lazy {
|
||||
Class.forName("net.corda.node.services.statemachine.SingleThreadedStateMachineManager").name
|
||||
}
|
||||
}
|
@ -185,7 +185,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -201,7 +201,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("finality_flag") && readCounter("counter") < 5
|
||||
@ -280,7 +280,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -296,7 +296,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("finality_flag") && readCounter("counter") < 7
|
||||
|
@ -40,7 +40,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -48,7 +48,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
@ -123,7 +123,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -131,7 +131,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
@ -206,7 +206,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Set flag when inside executeAcknowledgeMessages
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeAcknowledgeMessages
|
||||
AT INVOKE ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction()
|
||||
IF !flagged("exception_flag")
|
||||
@ -294,7 +294,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -302,7 +302,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
@ -384,7 +384,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -392,7 +392,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
@ -474,7 +474,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
// seems to be restarting the flow from the beginning every time
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -490,7 +490,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
|
||||
@ -498,7 +498,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Set flag when executing first commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && !flagged("commit_flag")
|
||||
@ -574,7 +574,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
// seems to be restarting the flow from the beginning every time
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -590,7 +590,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction when removing checkpoint
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
|
||||
@ -673,7 +673,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
|
||||
@ -681,7 +681,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Set flag when executing first commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && !flagged("commit_flag")
|
||||
@ -689,7 +689,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on retry
|
||||
CLASS ${SingleThreadedStateMachineManager::class.java.name}
|
||||
CLASS $stateMachineManagerClassName
|
||||
METHOD addAndStartFlow
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
|
||||
@ -776,7 +776,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
|
||||
@ -784,7 +784,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Set flag when executing first commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && !flagged("commit_flag")
|
||||
@ -792,7 +792,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on retry
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("suspend_flag") && flagged("commit_exception_flag") && !flagged("retry_exception_flag")
|
||||
@ -871,7 +871,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF !flagged("commit_exception_flag")
|
||||
@ -879,7 +879,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on retry
|
||||
CLASS ${SingleThreadedStateMachineManager::class.java.name}
|
||||
CLASS $stateMachineManagerClassName
|
||||
METHOD onExternalStartFlow
|
||||
AT ENTRY
|
||||
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
|
||||
@ -955,7 +955,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -971,7 +971,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction when removing checkpoint
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4
|
||||
@ -1056,7 +1056,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1064,7 +1064,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
@ -1151,7 +1151,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1159,7 +1159,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
@ -1247,7 +1247,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1255,7 +1255,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
@ -1345,7 +1345,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1353,7 +1353,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
@ -1436,7 +1436,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1452,7 +1452,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction when removing checkpoint
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
|
||||
@ -1540,7 +1540,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1548,7 +1548,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
@ -1638,7 +1638,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -1646,7 +1646,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
|
@ -206,7 +206,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -214,7 +214,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
|
@ -45,7 +45,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -69,7 +69,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
|
||||
@ -77,7 +77,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Set flag when executing first commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag")
|
||||
@ -155,7 +155,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -179,7 +179,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 5
|
||||
@ -257,7 +257,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -273,7 +273,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && readCounter("counter") < 5
|
||||
@ -351,7 +351,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
@ -367,7 +367,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 5
|
||||
@ -375,7 +375,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
ENDRULE
|
||||
|
||||
RULE Set flag when executing first commit
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF flagged("subflow_flag") && !flagged("commit_flag")
|
||||
|
@ -19,7 +19,7 @@ import java.time.Duration
|
||||
/**
|
||||
* This is the bottom execution engine of flow side-effects.
|
||||
*/
|
||||
class ActionExecutorImpl(
|
||||
internal class ActionExecutorImpl(
|
||||
private val services: ServiceHubInternal,
|
||||
private val checkpointStorage: CheckpointStorage,
|
||||
private val flowMessaging: FlowMessaging,
|
||||
|
@ -9,7 +9,7 @@ import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
internal class FlowSleepScheduler(private val smm: StateMachineManagerInternal, private val scheduledExecutor: ScheduledExecutorService) {
|
||||
internal class FlowSleepScheduler(private val innerState: StateMachineInnerState, private val scheduledExecutor: ScheduledExecutorService) {
|
||||
|
||||
private companion object {
|
||||
val log = contextLogger()
|
||||
@ -31,15 +31,6 @@ internal class FlowSleepScheduler(private val smm: StateMachineManagerInternal,
|
||||
currentState.future = setAlarmClock(fiber, duration)
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a wake up event.
|
||||
*
|
||||
* @param fiber The [FlowFiber] to schedule a wake up event for
|
||||
*/
|
||||
fun scheduleWakeUp(fiber: FlowFiber) {
|
||||
fiber.scheduleEvent(Event.WakeUpFromSleep)
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a sleeping flow's future. Note, this does not cause the flow to wake up.
|
||||
*
|
||||
@ -64,15 +55,24 @@ internal class FlowSleepScheduler(private val smm: StateMachineManagerInternal,
|
||||
|
||||
private fun setAlarmClock(fiber: FlowFiber, duration: Duration): ScheduledFuture<Unit> {
|
||||
val instance = fiber.instanceId
|
||||
log.debug { "Putting flow to sleep for $duration" }
|
||||
log.debug { "Putting flow ${instance.runId} to sleep for $duration" }
|
||||
return scheduledExecutor.schedule<Unit>(
|
||||
{
|
||||
log.debug { "Scheduling flow wake up event for flow ${instance.runId}" }
|
||||
// This passes back into the SMM to check that the fiber that went to sleep is the same fiber that is now being scheduled
|
||||
// with the wake up event
|
||||
smm.scheduleFlowWakeUp(instance)
|
||||
scheduleWakeUp(instance)
|
||||
},
|
||||
duration.toMillis(), TimeUnit.MILLISECONDS
|
||||
)
|
||||
}
|
||||
|
||||
private fun scheduleWakeUp(instance: StateMachineInstanceId) {
|
||||
innerState.withLock {
|
||||
flows[instance.runId]?.let { flow ->
|
||||
// Only schedule a wake up event if the fiber the flow is executing on has not changed
|
||||
if (flow.fiber.instanceId == instance) {
|
||||
flow.fiber.scheduleEvent(Event.WakeUpFromSleep)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
internal class FlowTimeoutScheduler(
|
||||
private val innerState: StateMachineInnerState,
|
||||
private val scheduledExecutor: ScheduledExecutorService,
|
||||
private val serviceHub: ServiceHubInternal
|
||||
) {
|
||||
|
||||
private companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules the flow [flowId] to be retried if it does not finish within the timeout period
|
||||
* specified in the config.
|
||||
*
|
||||
* @param flowId The id of the flow that the timeout is scheduled for
|
||||
*/
|
||||
fun timeout(flowId: StateMachineRunId) {
|
||||
timeout(flowId) { flow, retryCount ->
|
||||
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
|
||||
ScheduledTimeout(scheduledFuture, retryCount + 1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a flow's timeout future.
|
||||
*
|
||||
* @param flowId The flow's id
|
||||
*/
|
||||
fun cancel(flowId: StateMachineRunId) {
|
||||
innerState.withLock {
|
||||
timedFlows[flowId]?.let { (future, _) ->
|
||||
future.cancelIfRunning()
|
||||
timedFlows.remove(flowId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets a flow's timeout with the input timeout duration, only if it is longer than the default flow timeout configuration.
|
||||
*
|
||||
* @param flowId The flow's id
|
||||
* @param timeoutSeconds The custom timeout
|
||||
*/
|
||||
fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
|
||||
log.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
|
||||
return
|
||||
}
|
||||
log.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
|
||||
timeout(flowId) { flow, retryCount ->
|
||||
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
|
||||
ScheduledTimeout(scheduledFuture, retryCount)
|
||||
}
|
||||
}
|
||||
|
||||
private inline fun timeout(flowId: StateMachineRunId, timeout: (flow: Flow<*>, retryCount: Int) -> ScheduledTimeout) {
|
||||
innerState.withLock {
|
||||
val flow = flows[flowId]
|
||||
if (flow != null) {
|
||||
val retryCount = timedFlows[flowId]?.let { (future, retryCount) ->
|
||||
future.cancelIfRunning()
|
||||
retryCount
|
||||
} ?: 0
|
||||
timedFlows[flowId] = timeout(flow, retryCount)
|
||||
} else {
|
||||
log.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow<*>, delay: Long): ScheduledFuture<*> {
|
||||
return scheduledExecutor.schedule({
|
||||
val event = Event.Error(FlowTimeoutException())
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, delay, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||
return serviceHub.configuration.flowTimeout.run {
|
||||
val timeoutDelaySeconds =
|
||||
timeout.seconds * Math.pow(backoffBase, Integer.min(retryCount, maxRestartCount).toDouble()).toLong()
|
||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
private fun Future<*>.cancelIfRunning() {
|
||||
if (!isDone) cancel(true)
|
||||
}
|
||||
}
|
@ -13,10 +13,8 @@ import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
@ -49,8 +47,6 @@ import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Integer.min
|
||||
import java.security.SecureRandom
|
||||
import java.time.Duration
|
||||
import java.util.HashSet
|
||||
@ -58,8 +54,6 @@ import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
@ -71,7 +65,7 @@ import kotlin.streams.toList
|
||||
* thread actually starts them via [deliverExternalEvent].
|
||||
*/
|
||||
@ThreadSafe
|
||||
class SingleThreadedStateMachineManager(
|
||||
internal class SingleThreadedStateMachineManager(
|
||||
val serviceHub: ServiceHubInternal,
|
||||
private val checkpointStorage: CheckpointStorage,
|
||||
val executor: ExecutorService,
|
||||
@ -84,27 +78,7 @@ class SingleThreadedStateMachineManager(
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private data class ScheduledTimeout(
|
||||
/** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */
|
||||
val scheduledFuture: ScheduledFuture<*>,
|
||||
/** Specifies the number of times this flow has been retried. */
|
||||
val retryCount: Int = 0
|
||||
)
|
||||
|
||||
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
|
||||
// property.
|
||||
private class InnerState {
|
||||
val changesPublisher = PublishSubject.create<StateMachineManager.Change>()!!
|
||||
/** True if we're shutting down, so don't resume anything. */
|
||||
var stopping = false
|
||||
val flows = HashMap<StateMachineRunId, Flow<*>>()
|
||||
val pausedFlows = HashMap<StateMachineRunId, NonResidentFlow>()
|
||||
val startedFutures = HashMap<StateMachineRunId, OpenFuture<Unit>>()
|
||||
/** Flows scheduled to be retried if not finished within the specified timeout period. */
|
||||
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private val innerState = StateMachineInnerStateImpl()
|
||||
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
|
||||
private val scheduledFutureExecutor = Executors.newSingleThreadScheduledExecutor(
|
||||
ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build()
|
||||
@ -115,7 +89,8 @@ class SingleThreadedStateMachineManager(
|
||||
private val metrics = serviceHub.monitoringService.metrics
|
||||
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
|
||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||
private val flowSleepScheduler = FlowSleepScheduler(this, scheduledFutureExecutor)
|
||||
private val flowSleepScheduler = FlowSleepScheduler(innerState, scheduledFutureExecutor)
|
||||
private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub)
|
||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
||||
|
||||
@ -126,7 +101,7 @@ class SingleThreadedStateMachineManager(
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
|
||||
override val allStateMachines: List<FlowLogic<*>>
|
||||
get() = mutex.locked { flows.values.map { it.fiber.logic } }
|
||||
get() = innerState.withLock { flows.values.map { it.fiber.logic } }
|
||||
|
||||
private val totalStartedFlows = metrics.counter("Flows.Started")
|
||||
private val totalFinishedFlows = metrics.counter("Flows.Finished")
|
||||
@ -137,7 +112,7 @@ class SingleThreadedStateMachineManager(
|
||||
*
|
||||
* We use assignment here so that multiple subscribers share the same wrapped Observable.
|
||||
*/
|
||||
override val changes: Observable<StateMachineManager.Change> = mutex.content.changesPublisher
|
||||
override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
|
||||
|
||||
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
|
||||
checkQuasarJavaAgentPresence()
|
||||
@ -157,19 +132,20 @@ class SingleThreadedStateMachineManager(
|
||||
StateMachineManager.StartMode.Safe -> markAllFlowsAsPaused()
|
||||
}
|
||||
this.flowCreator = FlowCreator(
|
||||
checkpointSerializationContext,
|
||||
checkpointStorage,
|
||||
scheduler,
|
||||
database,
|
||||
transitionExecutor,
|
||||
actionExecutor,
|
||||
secureRandom,
|
||||
serviceHub,
|
||||
unfinishedFibers,
|
||||
::resetCustomTimeout)
|
||||
checkpointSerializationContext,
|
||||
checkpointStorage,
|
||||
scheduler,
|
||||
database,
|
||||
transitionExecutor,
|
||||
actionExecutor,
|
||||
secureRandom,
|
||||
serviceHub,
|
||||
unfinishedFibers,
|
||||
flowTimeoutScheduler::resetCustomTimeout
|
||||
)
|
||||
|
||||
val fibers = restoreFlowsFromCheckpoints()
|
||||
metrics.register("Flows.InFlight", Gauge<Int> { mutex.content.flows.size })
|
||||
metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size })
|
||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
||||
if (throwable is VirtualMachineError) {
|
||||
errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", throwable)
|
||||
@ -179,7 +155,7 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
val pausedFlows = restoreNonResidentFlowsFromPausedCheckpoints()
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
this.pausedFlows.putAll(pausedFlows)
|
||||
for ((id, flow) in pausedFlows) {
|
||||
val checkpoint = flow.checkpoint
|
||||
@ -199,10 +175,10 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
override fun snapshot(): Set<FlowStateMachineImpl<*>> = mutex.content.flows.values.map { it.fiber }.toSet()
|
||||
override fun snapshot(): Set<FlowStateMachineImpl<*>> = innerState.flows.values.map { it.fiber }.toSet()
|
||||
|
||||
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
|
||||
return mutex.locked {
|
||||
return innerState.withLock {
|
||||
flows.values.mapNotNull {
|
||||
flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture }
|
||||
}
|
||||
@ -217,7 +193,7 @@ class SingleThreadedStateMachineManager(
|
||||
*/
|
||||
override fun stop(allowedUnsuspendedFiberCount: Int) {
|
||||
require(allowedUnsuspendedFiberCount >= 0){"allowedUnsuspendedFiberCount must be greater than or equal to zero"}
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
if (stopping) throw IllegalStateException("Already stopping!")
|
||||
stopping = true
|
||||
for ((_, flow) in flows) {
|
||||
@ -241,7 +217,7 @@ class SingleThreadedStateMachineManager(
|
||||
* calls to [allStateMachines]
|
||||
*/
|
||||
override fun track(): DataFeed<List<FlowLogic<*>>, StateMachineManager.Change> {
|
||||
return mutex.locked {
|
||||
return innerState.withMutex {
|
||||
database.transaction {
|
||||
DataFeed(flows.values.map { it.fiber.logic }, changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction(database))
|
||||
}
|
||||
@ -266,7 +242,7 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
override fun killFlow(id: StateMachineRunId): Boolean {
|
||||
val killFlowResult = mutex.locked {
|
||||
val killFlowResult = innerState.withLock {
|
||||
val flow = flows[id]
|
||||
if (flow != null) {
|
||||
logger.info("Killing flow $id known to this node.")
|
||||
@ -281,7 +257,7 @@ class SingleThreadedStateMachineManager(
|
||||
unfinishedFibers.countDown()
|
||||
|
||||
val state = flow.fiber.transientState
|
||||
return@locked if (state != null) {
|
||||
return@withLock if (state != null) {
|
||||
state.value.isKilled = true
|
||||
flow.fiber.scheduleEvent(Event.DoRemainingWork)
|
||||
true
|
||||
@ -333,9 +309,9 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) {
|
||||
mutex.locked {
|
||||
cancelTimeoutIfScheduled(flowId)
|
||||
cancelFlowSleep(lastState)
|
||||
innerState.withLock {
|
||||
flowTimeoutScheduler.cancel(flowId)
|
||||
flowSleepScheduler.cancel(lastState)
|
||||
val flow = flows.remove(flowId)
|
||||
if (flow != null) {
|
||||
decrementLiveFibers()
|
||||
@ -352,7 +328,7 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
override fun signalFlowHasStarted(flowId: StateMachineRunId) {
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
startedFutures.remove(flowId)?.set(Unit)
|
||||
flows[flowId]?.let { flow ->
|
||||
changesPublisher.onNext(StateMachineManager.Change.Add(flow.fiber.logic))
|
||||
@ -378,7 +354,7 @@ class SingleThreadedStateMachineManager(
|
||||
return checkpointStorage.getCheckpointsToRun().use {
|
||||
it.mapNotNull { (id, serializedCheckpoint) ->
|
||||
// If a flow is added before start() then don't attempt to restore it
|
||||
mutex.locked { if (id in flows) return@mapNotNull null }
|
||||
innerState.withLock { if (id in flows) return@mapNotNull null }
|
||||
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null
|
||||
flowCreator.createFlowFromCheckpoint(id, checkpoint)
|
||||
}.toList()
|
||||
@ -403,11 +379,11 @@ class SingleThreadedStateMachineManager(
|
||||
|
||||
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "MaxLineLength") // this is fully intentional here, see comment in the catch clause
|
||||
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
|
||||
cancelFlowSleep(currentState)
|
||||
flowSleepScheduler.cancel(currentState)
|
||||
// Get set of external events
|
||||
val flowId = currentState.flowLogic.runId
|
||||
try {
|
||||
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
|
||||
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
|
||||
if (oldFlowLeftOver == null) {
|
||||
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
|
||||
return
|
||||
@ -428,7 +404,7 @@ class SingleThreadedStateMachineManager(
|
||||
// Just flow initiation message
|
||||
null
|
||||
}
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
if (stopping) {
|
||||
return
|
||||
}
|
||||
@ -467,7 +443,7 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
override fun deliverExternalEvent(event: ExternalEvent) {
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
if (!stopping) {
|
||||
when (event) {
|
||||
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
|
||||
@ -527,7 +503,7 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
} else {
|
||||
val event = Event.DeliverSessionMessage(sessionMessage, deduplicationHandler, sender)
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
flows[flowId]?.run { fiber.scheduleEvent(event) }
|
||||
// If flow is not running add it to the list of external events to be processed if/when the flow resumes.
|
||||
?: pausedFlows[flowId]?.run { addExternalEvent(event) }
|
||||
@ -623,7 +599,7 @@ class SingleThreadedStateMachineManager(
|
||||
deduplicationHandler: DeduplicationHandler?
|
||||
): CordaFuture<FlowStateMachine<A>> {
|
||||
|
||||
val existingFlow = mutex.locked { flows[flowId] }
|
||||
val existingFlow = innerState.withLock { flows[flowId] }
|
||||
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) {
|
||||
// Load the flow's checkpoint
|
||||
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
|
||||
@ -648,7 +624,7 @@ class SingleThreadedStateMachineManager(
|
||||
|
||||
val flow = flowCreator.createFlowFromLogic(flowId, invocationContext, flowLogic, flowStart, ourIdentity, existingCheckpoint, deduplicationHandler, ourSenderUUID)
|
||||
val startedFuture = openFuture<Unit>()
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
startedFutures[flowId] = startedFuture
|
||||
}
|
||||
totalStartedFlows.inc()
|
||||
@ -657,110 +633,17 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
override fun scheduleFlowTimeout(flowId: StateMachineRunId) {
|
||||
mutex.locked { scheduleTimeout(flowId) }
|
||||
flowTimeoutScheduler.timeout(flowId)
|
||||
}
|
||||
|
||||
override fun cancelFlowTimeout(flowId: StateMachineRunId) {
|
||||
mutex.locked { cancelTimeoutIfScheduled(flowId) }
|
||||
flowTimeoutScheduler.cancel(flowId)
|
||||
}
|
||||
|
||||
override fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) {
|
||||
flowSleepScheduler.sleep(fiber, currentState, duration)
|
||||
}
|
||||
|
||||
override fun scheduleFlowWakeUp(instanceId: StateMachineInstanceId) {
|
||||
mutex.locked {
|
||||
flows[instanceId.runId]?.let { flow ->
|
||||
// Only schedule a wake up event if the fiber the flow is executing on has not changed
|
||||
if (flow.fiber.instanceId == instanceId) {
|
||||
flowSleepScheduler.scheduleWakeUp(flow.fiber)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun cancelFlowSleep(currentState: StateMachineState) {
|
||||
flowSleepScheduler.cancel(currentState)
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules the flow [flowId] to be retried if it does not finish within the timeout period
|
||||
* specified in the config.
|
||||
*
|
||||
* Assumes lock is taken on the [InnerState].
|
||||
*/
|
||||
private fun InnerState.scheduleTimeout(flowId: StateMachineRunId) {
|
||||
val flow = flows[flowId]
|
||||
if (flow != null) {
|
||||
val scheduledTimeout = timedFlows[flowId]
|
||||
val retryCount = if (scheduledTimeout != null) {
|
||||
val timeoutFuture = scheduledTimeout.scheduledFuture
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
|
||||
logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
|
||||
return
|
||||
}
|
||||
logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
|
||||
mutex.locked {
|
||||
resetCustomTimeout(flowId, timeoutSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
|
||||
val flow = flows[flowId]
|
||||
if (flow != null) {
|
||||
val scheduledTimeout = timedFlows[flowId]
|
||||
val retryCount = if (scheduledTimeout != null) {
|
||||
val timeoutFuture = scheduledTimeout.scheduledFuture
|
||||
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
|
||||
scheduledTimeout.retryCount
|
||||
} else 0
|
||||
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
|
||||
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount)
|
||||
} else {
|
||||
logger.warn("Unable to schedule timeout for flow $flowId – flow not found.")
|
||||
}
|
||||
}
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow<*>, delay: Long): ScheduledFuture<*> {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
scheduledFutureExecutor.schedule({
|
||||
val event = Event.Error(FlowTimeoutException())
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, delay, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, min(retryCount, maxRestartCount).toDouble()).toLong()
|
||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels any scheduled flow timeout for [flowId].
|
||||
*
|
||||
* Assumes lock is taken on the [InnerState].
|
||||
*/
|
||||
private fun InnerState.cancelTimeoutIfScheduled(flowId: StateMachineRunId) {
|
||||
timedFlows[flowId]?.let { (future, _) ->
|
||||
if (!future.isDone) future.cancel(true)
|
||||
timedFlows.remove(flowId)
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? {
|
||||
return try {
|
||||
serializedCheckpoint.deserialize(checkpointSerializationContext!!)
|
||||
@ -775,7 +658,7 @@ class SingleThreadedStateMachineManager(
|
||||
for (sessionId in getFlowSessionIds(checkpoint)) {
|
||||
sessionToFlow[sessionId] = id
|
||||
}
|
||||
mutex.locked {
|
||||
innerState.withLock {
|
||||
if (stopping) {
|
||||
startedFutures[id]?.setException(IllegalStateException("Will not start flow as SMM is stopping"))
|
||||
logger.trace("Not resuming as SMM is stopping.")
|
||||
@ -788,7 +671,7 @@ class SingleThreadedStateMachineManager(
|
||||
oldFlow.resultFuture.captureLater(flow.resultFuture)
|
||||
}
|
||||
val flowLogic = flow.fiber.logic
|
||||
if (flowLogic.isEnabledTimedFlow()) scheduleTimeout(id)
|
||||
if (flowLogic.isEnabledTimedFlow()) flowTimeoutScheduler.timeout(id)
|
||||
flow.fiber.scheduleEvent(Event.DoRemainingWork)
|
||||
startOrResume(checkpoint, flow)
|
||||
}
|
||||
@ -848,7 +731,7 @@ class SingleThreadedStateMachineManager(
|
||||
return StaffedFlowHospital(flowMessaging, serviceHub.clock, ourSenderUUID)
|
||||
}
|
||||
|
||||
private fun InnerState.removeFlowOrderly(
|
||||
private fun StateMachineInnerState.removeFlowOrderly(
|
||||
flow: Flow<*>,
|
||||
removalReason: FlowRemovalReason.OrderlyFinish,
|
||||
lastState: StateMachineState
|
||||
@ -864,7 +747,7 @@ class SingleThreadedStateMachineManager(
|
||||
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue)))
|
||||
}
|
||||
|
||||
private fun InnerState.removeFlowError(
|
||||
private fun StateMachineInnerState.removeFlowError(
|
||||
flow: Flow<*>,
|
||||
removalReason: FlowRemovalReason.ErrorFinish,
|
||||
lastState: StateMachineState
|
||||
|
@ -0,0 +1,44 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.node.services.statemachine.StateMachineManager.Change
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.locks.Lock
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
internal interface StateMachineInnerState {
|
||||
val lock: Lock
|
||||
val flows: MutableMap<StateMachineRunId, Flow<*>>
|
||||
val pausedFlows: MutableMap<StateMachineRunId, NonResidentFlow>
|
||||
val startedFutures: MutableMap<StateMachineRunId, OpenFuture<Unit>>
|
||||
val changesPublisher: PublishSubject<Change>
|
||||
/** Flows scheduled to be retried if not finished within the specified timeout period. */
|
||||
val timedFlows: MutableMap<StateMachineRunId, ScheduledTimeout>
|
||||
|
||||
fun <R> withMutex(block: StateMachineInnerState.() -> R): R
|
||||
}
|
||||
|
||||
internal class StateMachineInnerStateImpl : StateMachineInnerState {
|
||||
/** True if we're shutting down, so don't resume anything. */
|
||||
var stopping = false
|
||||
override val lock = ReentrantLock()
|
||||
override val changesPublisher = PublishSubject.create<Change>()!!
|
||||
override val flows = HashMap<StateMachineRunId, Flow<*>>()
|
||||
override val pausedFlows = HashMap<StateMachineRunId, NonResidentFlow>()
|
||||
override val startedFutures = HashMap<StateMachineRunId, OpenFuture<Unit>>()
|
||||
override val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
|
||||
|
||||
override fun <R> withMutex(block: StateMachineInnerState.() -> R): R = lock.withLock { block(this) }
|
||||
}
|
||||
|
||||
internal inline fun <reified T : StateMachineInnerState, R> T.withLock(block: T.() -> R): R = lock.withLock { block(this) }
|
||||
|
||||
internal data class ScheduledTimeout(
|
||||
/** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */
|
||||
val scheduledFuture: ScheduledFuture<*>,
|
||||
/** Specifies the number of times this flow has been retried. */
|
||||
val retryCount: Int = 0
|
||||
)
|
@ -102,7 +102,7 @@ interface StateMachineManager {
|
||||
|
||||
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call
|
||||
// these functions again
|
||||
interface StateMachineManagerInternal {
|
||||
internal interface StateMachineManagerInternal {
|
||||
fun signalFlowHasStarted(flowId: StateMachineRunId)
|
||||
fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId)
|
||||
fun removeSessionBindings(sessionIds: Set<SessionId>)
|
||||
@ -111,7 +111,6 @@ interface StateMachineManagerInternal {
|
||||
fun scheduleFlowTimeout(flowId: StateMachineRunId)
|
||||
fun cancelFlowTimeout(flowId: StateMachineRunId)
|
||||
fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration)
|
||||
fun scheduleFlowWakeUp(instanceId: StateMachineInstanceId)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -14,7 +14,7 @@ import java.lang.reflect.Field
|
||||
* If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
|
||||
*/
|
||||
//TODO: instead of replacing the progress tracker after constructing the flow logic, we should inject it during fiber deserialization
|
||||
fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
|
||||
internal fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
|
||||
if (oldTracker != null) {
|
||||
val newTracker = newFlowLogic.progressTracker
|
||||
if (newTracker != null) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user