class StateMachineManager
A StateMachineManager is responsible for coordination and persistence of multiple ProtocolStateMachine objects. Each such object represents an instantiation of a (two-party) protocol that has reached a particular point.
An implementation of this class will persist state machines to long term storage so they can survive process restarts and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each other (bad for performance, good for programmer mental health).
A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
The SMM will always invoke the protocol fibers on the given AffinityExecutor, regardless of which thread actually starts them via add.
TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised continuation is always unique? TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk TODO: Timeouts TODO: Surfacing of exceptions via an API and/or management UI TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message cant hurt TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
Change |
data class Change |
ExistingSessionMessage |
interface ExistingSessionMessage : SessionMessage |
FiberScheduler |
inner class FiberScheduler |
ProtocolSession |
data class ProtocolSession |
SessionConfirm |
data class SessionConfirm : SessionInitResponse |
SessionData |
data class SessionData : ExistingSessionMessage |
SessionEnd |
data class SessionEnd : ExistingSessionMessage |
SessionInit |
data class SessionInit : SessionMessage |
SessionInitResponse |
interface SessionInitResponse : ExistingSessionMessage |
SessionMessage |
interface SessionMessage |
SessionReject |
data class SessionReject : SessionInitResponse |
<init> |
StateMachineManager(serviceHub: ServiceHubInternal, tokenizableServices: List<Any>, checkpointStorage: CheckpointStorage, executor: AffinityExecutor, database: <ERROR CLASS>) A StateMachineManager is responsible for coordination and persistence of multiple ProtocolStateMachine objects. Each such object represents an instantiation of a (two-party) protocol that has reached a particular point. |
allStateMachines |
val allStateMachines: List<ProtocolLogic<*>> |
changes |
val changes: <ERROR CLASS><Change> An observable that emits triples of the changing protocol, the type of change, and a process-specific ID number which may change across restarts. |
checkpointStorage |
val checkpointStorage: CheckpointStorage |
database |
val database: <ERROR CLASS> |
executor |
val executor: AffinityExecutor |
scheduler |
val scheduler: FiberScheduler |
serviceHub |
val serviceHub: ServiceHubInternal |
add |
fun <T> add(logic: ProtocolLogic<T>): ProtocolStateMachine<T> Kicks off a brand new state machine of the given class. The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is restarted with checkpointed state machines in the storage service. |
findStateMachines |
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<<ERROR CLASS><P, <ERROR CLASS><T>>> Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) |
start |
fun start(): Unit |
stop |
fun stop(allowedUnsuspendedFiberCount: Int = 0): Unit Start the shutdown process, bringing the StateMachineManager to a controlled stop. When this method returns, all Fibers have been suspended and checkpointed, or have completed. |
track |
fun track(): <ERROR CLASS><List<ProtocolStateMachineImpl<*>>, <ERROR CLASS><Change>> Atomic get snapshot + subscribe. This is needed so we dont miss updates between subscriptions to changes and calls to allStateMachines |