public class StateMachineManager
A StateMachineManager is responsible for coordination and persistence of multiple interface FlowStateMachine
objects.
Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
An implementation of this class will persist state machines to long term storage so they can survive process restarts and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each other (bad for performance, good for programmer mental health!).
A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
The SMM will always invoke the flow fibers on the given interface AffinityExecutor
, regardless of which thread actually
starts them via add.
TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised continuation is always unique? TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk TODO: Timeouts TODO: Surfacing of exceptions via an API and/or management UI TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
Modifier and Type | Class and Description |
---|---|
static class |
StateMachineManager.Change |
static class |
StateMachineManager.Companion |
class |
StateMachineManager.FiberScheduler |
static class |
StateMachineManager.FlowSession |
static class |
StateMachineManager.FlowSessionState
class StateMachineManager.FlowSessionState describes the session's state. |
Modifier and Type | Field and Description |
---|---|
static StateMachineManager.Companion |
Companion |
Constructor and Description |
---|
StateMachineManager(ServiceHubInternal serviceHub,
java.util.List<? extends java.lang.Object> tokenizableServices,
CheckpointStorage checkpointStorage,
AffinityExecutor executor,
org.jetbrains.exposed.sql.Database database,
org.apache.activemq.artemis.utils.ReusableLatch unfinishedFibers)
A StateMachineManager is responsible for coordination and persistence of multiple
interface FlowStateMachine objects.
Each such object represents an instantiation of a (two-party) flow that has reached a particular point. |
Modifier and Type | Method and Description |
---|---|
<T> FlowStateMachine<T> |
add(FlowLogic<? extends T> logic)
Kicks off a brand new state machine of the given class.
The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
restarted with checkpointed state machines in the storage service.
|
<P extends FlowLogic<? extends T>,T> |
findStateMachines(java.lang.Class<P> flowClass)
Returns a list of all state machines executing the given flow logic at the top level (subflows do not count)
|
java.util.List<net.corda.core.flows.FlowLogic> |
getAllStateMachines() |
rx.Observable<net.corda.node.services.statemachine.StateMachineManager.Change> |
getChanges()
An observable that emits triples of the changing flow, the type of change, and a process-specific ID number
which may change across restarts.
|
CheckpointStorage |
getCheckpointStorage() |
org.jetbrains.exposed.sql.Database |
getDatabase() |
AffinityExecutor |
getExecutor() |
StateMachineManager.FiberScheduler |
getScheduler() |
ServiceHubInternal |
getServiceHub() |
void |
start() |
void |
stop(int allowedUnsuspendedFiberCount)
Start the shutdown process, bringing the
class StateMachineManager to a controlled stop. When this method returns,
all Fibers have been suspended and checkpointed, or have completed. |
kotlin.Pair<java.util.List,rx.Observable> |
track()
Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to changes and
calls to allStateMachines
|
public static StateMachineManager.Companion Companion
public StateMachineManager(ServiceHubInternal serviceHub, java.util.List<? extends java.lang.Object> tokenizableServices, CheckpointStorage checkpointStorage, AffinityExecutor executor, org.jetbrains.exposed.sql.Database database, org.apache.activemq.artemis.utils.ReusableLatch unfinishedFibers)
A StateMachineManager is responsible for coordination and persistence of multiple interface FlowStateMachine
objects.
Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
An implementation of this class will persist state machines to long term storage so they can survive process restarts and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each other (bad for performance, good for programmer mental health!).
A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
The SMM will always invoke the flow fibers on the given interface AffinityExecutor
, regardless of which thread actually
starts them via add.
TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised continuation is always unique? TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk TODO: Timeouts TODO: Surfacing of exceptions via an API and/or management UI TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
public StateMachineManager.FiberScheduler getScheduler()
public <P extends FlowLogic<? extends T>,T> java.util.List<kotlin.Pair> findStateMachines(java.lang.Class<P> flowClass)
Returns a list of all state machines executing the given flow logic at the top level (subflows do not count)
public java.util.List<net.corda.core.flows.FlowLogic> getAllStateMachines()
public rx.Observable<net.corda.node.services.statemachine.StateMachineManager.Change> getChanges()
An observable that emits triples of the changing flow, the type of change, and a process-specific ID number which may change across restarts.
We use assignment here so that multiple subscribers share the same wrapped Observable.
public void start()
public void stop(int allowedUnsuspendedFiberCount)
Start the shutdown process, bringing the class StateMachineManager
to a controlled stop. When this method returns,
all Fibers have been suspended and checkpointed, or have completed.
allowedUnsuspendedFiberCount
- Optional parameter is used in some tests.class StateMachineManager
public kotlin.Pair<java.util.List,rx.Observable> track()
Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to changes and calls to allStateMachines
public <T> FlowStateMachine<T> add(FlowLogic<? extends T> logic)
Kicks off a brand new state machine of the given class. The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is restarted with checkpointed state machines in the storage service.
Note that you must be on the executor thread.
public ServiceHubInternal getServiceHub()
public CheckpointStorage getCheckpointStorage()
public AffinityExecutor getExecutor()
public org.jetbrains.exposed.sql.Database getDatabase()