corda/docs/source/contributing-flow-internals.rst
josecoll 99e802c5c5 CORDA-3071 - Checkpoint agent tool (#5295)
* Initial commit based on experimental kryo hook agent.

* WIP

* Added documentation.

* Additional improvements and documentation following more testing.

* Added field level instrumentation + basic type handlers for String, byteArray, charArray, primitive types.

* Working version (without array type handling)

* Missing build.gradle file.

* Handle display of Arrays and String.
Pruning output to avoid repetition (by loop depth, object count).

* Added configurable StackDepth (for display purposes) and filter out ProgressTracker stacks.

* Further array handling (Object arrays, 2D, 3D), improved display and general code cleanup.

* Various fixes and improvements following demo to RP.

* Clean-up

* Further clean-up

* Set checkpoint id before deserialization.

* Update documentation

* Final clean-up.

* Minor documentation fixes.

* Updates following PR review feedback.

* Add changelog entry.
2019-08-01 09:40:52 +01:00

34 KiB

Flow framework internals

Quasar

Instrumentation

Quasar rewrites bytecode to achieve a couple of things:

  1. Collect the contents of the execution stack. It uses thread-local datastructures for this.
  2. Create a way to jump from suspension points. It uses an un-catchable exception throw for this.
  3. Create a way to jump into suspension points. It uses a sequence of switch statements for this.

To this end Quasar transforms the JVM bytecode of @Suspendable functions. Take the following as an example:

@Suspendable
fun s0(a): Int {
  val b = ..
  n1()
  s1()

  for (..) {
    val c = ..
    n2()
    s2(b)
  }

  n3()
  s3()

  return 1;
}

fun n1() { .. }
fun n2() { .. }
fun n3() { .. }
fun n4() { .. }
@Suspendable fun s1() { .. }
@Suspendable fun s2(b) { .. }
@Suspendable fun s3() { .. }

Quasar's javaagent, when loading bytecode, will look for functions with the @Suspendable annotation. Furthermore within these functions it will look for callsites of other @Suspendable functions (which is why it's important to annotate interface/abstract class methods as well as implementations). Note how n1-n4 are thus not instrumented, and their callsites aren't relevant in the instrumentation of s0.

Disregarding any potential optimizations, quasar will then do the following transformation of s0:

Note

The following code is pseudo-Kotlin code and includes non-existent constructs like arbitrary code labels and goto.

// Quasar uses this annotation to store some metadata about the function, and to check whether the function has been instrumented already
@Instrumented
fun s0(a): Int {
  // A variable to temporarily store s0's return value later on
  var __return = null
  // A variable to indicate whether we are being resumed (we are jumping into a suspension's continuation), or this is a "regular" call.
  lateinit var __resumed: Boolean
  // Retrieve the Quasar-internal execution stack, which is stored in a thread-local.
  var __stack = co.paralleluniverse.fibers.Stack.getStack()
  if (__stack == null) {
    // There's no stack, execute as a regular function
    goto lMethodStart
  }

  // We are being resumed, we are jumping into the suspension point
  __resumed = true
  // Retrieve the integer that indicates which part of this function we should be jumping into, stored in a thread-local.
  val __entry = co.paralleluniverse.fibers.Stack.nextMethodEntry()
  when (__entry) {
    0 -> {
      TODO
    }
    1 -> {
      TODO
    }
    2 -> {
      TODO
    }
    else -> {
      // The entry value is not recognized, the function may be called in a non-suspending capacity.
      val __isFunctionCalledAsSuspendable = co.paralleluniverse.fibers.Stack.isFirstInStackOrPushed()
      if (_isFunctionCalledAsSuspendable) {
        goto lMethodStart
      }
      __stack = null
    }
  }

  // The first code block, starting from the original non-transformed function start.
  lMethodStart:
  // This try-catch handles the Quasar-specific SuspendExecution exception. Quasar prevents the catching of this exception in user code.
  try {
    __resumed = false
    val b = ..
    TODO
  } catch (e: SuspendExecution {
    TODO
  }
}

Note

The Quasar javaagent code doing the above rewriting can be found here. Note that only the main parts of the instrumentation are shown above, the actual transformation is more complex and involves handling corner cases and optimizations.

Fibers

The above instrumentation allows the implementation of co-operative scheduling. That is, @Suspendable code can yield its execution by throwing a SuspendExecution exception. This exception throw takes care of handing the control flow to a top-level try-catch, which then has access to the thread-locally constructed execution stack, as well as a way to return to the suspension point using the "method entry" list.

A Fiber thus is nothing more than a data structure holding the execution stack, the method entry list, as well as various bookkeeping data related to the management of the Fiber, e.g. its state enum or identifier.

The main try-catch that handles the yielding may be found here.

Note

For those adventurous enough to explore the implementation, the execution stack and method entry list are merged into two growing arrays in Stack, one holding Object s (dataObject, for structured objects), the other holding long s (dataLong, for primitive values). The arrays always have the same length, and they both contain values for each stack frame. The primitive stack additionally has a "metadata" slot for each stack frame, this is where the "method entry" value is put, as well as frame size data.

Checkpoints

The main idea behind checkpoints is to utilize the Fiber data structure and treat it as a serializable object capturing the state of a running computation. Whenever a Corda-suspendable API is hit, we capture the execution stack and corresponding entry list, and serialize it using Kryo, a reflection-based serialization library capable of serializing unstructured data. We thus get a handle to an arbitrary suspended computation.

In the flow state machine there is a strict separation of the user-code's state, and the flow framework's internal state. The former is the serialized Fiber, and the latter consists of structured objects.

The definition of a Checkpoint can be found here.

The "user state" can be found in FlowState. It is either

  1. Unstarted: in this case there's no Fiber to serialize yet, we serialize the FlowLogic instead.
  2. Started: in this case the flow has been started already, and has been suspended on some IO. We store the FlowIORequest and the serialized Fiber.

The rest of the Checkpoint deals with internal bookkeeping. Sessions, the subflow-stack, errors. Note how all data structures are read-only. This is deliberate, to enable easier reasoning. Any "modification" of the checkpoint therefore implies making a shallow copy.

The state machine

The internals of the flow framework were designed as a state machine. A flow is a strange event loop that has a state, and goes through state transitions triggered by events. The transitions may be either

  1. User transitions, when we hand control to user-defined code in the cordapp. This may transition to a suspension point, the end of the flow, or may abort exceptionally.
  2. Internal transitions, where we keep strict track of side-effects and failure conditions.

The core data structures of the state machine are:

  1. StateMachineState: this is the full state of the state machine. It includes the Checkpoint (the persisted part of the state), and other non-persisted state, most importantly the list of pending DeduplicationHandler s, to be described later.
  2. Event: Every state transition is triggered by one of these. These may be external events, notifying the state machine of something, or internal events, for example suspensions.
  3. Action: These are created by internal state transitions. These transitions do not inherently execute any side-effects, instead, they create a list of Action s, which are later executed.
  4. FlowContinuation: indicates how the state machine should proceed after a transition. It can resume to user code, throw an exception, keep processing events or abort the flow completely.

The state machine is a pure function that when given an Event and an initial StateMachineState returns the next state, a list of Action s to execute, and a FlowContinuation to indicate how to proceed:

// https://github.com/corda/corda/blob/c04a448bf391fb73f9b60cc41e8b5f0c23f81470/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionResult.kt#L15
data class TransitionResult(
        val newState: StateMachineState,
        val actions: List<Action> = emptyList(),
        val continuation: FlowContinuation = FlowContinuation.ProcessEvents
)

// https://github.com/corda/corda/blob/c04a448bf391fb73f9b60cc41e8b5f0c23f81470/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt#L12
fun transition(event: Event, state: StateMachineState): TransitionResult

The top-level entry point for the state machine transitions is in TopLevelTransition.

As an example let's examine message delivery. This transition will be triggered by a DeliverSessionMessage event, defined like this:

data class DeliverSessionMessage(
        val sessionMessage: ExistingSessionMessage,
        override val deduplicationHandler: DeduplicationHandler,
        val sender: Party
) : Event(), GeneratedByExternalEvent

The event then goes through TopLevelTransition, which then passes it to DeliverSessionMessageTransition. This transition inspects the event, then does the relevant bookkeeping, updating sessions, buffering messages etc. Note that we don't do any checkpoint persistence, and we don't return control to the user code afterwards, we simply schedule a DoRemainingWork and return a ProcessEvents continuation. This means that it's going to be the next transition that decides whether the received message is "relevant" to the current suspension, and whether control should thus be returned to user code with the message.

FlowStateMachineImpl

The state machine is a pure function, so what is the "driver" of it, that actually executes the transitions and side-effects? This is what FlowStateMachineImpl is doing, which is a Fiber. This class requires great care when it's modified, as the programmer must be aware of what's on the stack, what fields get persisted as part of the Checkpoint, and how the control flow is wired.

The usual way to implement state machines is to create a simple event loop that keeps popping events from a queue, and executes the resulting transitions. With flows however this isn't so simple, because control must be returned to suspending operations. Therefore the eventloop is split up into several smaller eventloops, executed when "we get the chance", i.e. when users call API functions. Whenever the flow calls a Flow API function, control is handed to the flow framework, that's when we can process events, until a FlowContinuation indicates that control should be returned to user code.

There are two functions that aid the above:

  1. FlowStateMachineImpl.processEventsUntilFlowIsResumed: as the name suggests this is a loop that keeps popping and processing events from the flow's event queue, until a FlowContinuation.Resume or some continuation other than ProcessEvents is returned.
  2. FlowStateMachineImpl.processEventImmediately: this function skips the event queue and processes an event immediately. There are certain transitions (e.g. subflow enter/exit) that must be done this way, otherwise the event ordering can cause problems.

The two main functions that call the above are the top-level run, which is the entry point of the flow, and suspend, which every blocking API call eventually calls.

Suspensions

Let's take a look at suspend, which is the most delicate/brittle function in this class, and most probably the whole flow framework. Examining it will reveal a lot about how flows and fibers work.

@Suspendable
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {

First off, the type signature. We pass in a FlowIORequest<R>, which is an encapsulation of the IO action we're about to suspend on. It is a sealed class with members like Send/Receive/ExecuteAsyncOperation. It is serializable, and will be part of the Checkpoint. In fact, it is doubly-serialized, as it is in FlowState in a typed form, but is also present in the Fiber's stack, as a part of suspend's stack frame.

We also pass a maySkipCheckpoint boolean which if true will prevent the checkpoint from being persisted.

The function returns R, but the runtime control flow achieving this "return" is quite tricky. When the Fiber suspends a SuspendExecution exception will be thrown, and when the fiber is resumed this suspend function will be re-entered, however this time in a "different capacity", indicated by Quasar's implicitly stored method entry, which will jump to the end of the suspension. This is repeated several times as this function has two suspension points, one of them possibly executing multiple times, as we will see later.

val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext))
val transaction = extractThreadLocalTransaction()

These lines extract some data required for the suspension. Note that both local variables are TransientReference s, which means the referred-to object will not be serialized as part of the stack frame. During resumption from a deserialized checkpoint these local variables will thus be null, however at that point these objects will not be required anymore.

The first line gets the serialization context from a TransientValues datastructure, which is where all objects live that are required for the flow's functioning but which we don't want to persist. This means all of these values must be re-initialized each time we are restoring a flow from a persisted checkpoint.

parkAndSerialize { _, _ ->

This is the Quasar API that does the actual suspension. The passed in lambda will not be executed in the current suspend frame, but rather is stored temporarily in the internal Fiber structure, and will be run in the outer Quasar try-catch as a "post park" action after the catch of the SuspendExecution exception. See Fiber.java for details.

This means that within this lambda the Fiber will have already technically parked, but it hasn't yet properly yielded to the enclosing scheduler.

setLoggingContext()

Thread-locals are treated in a special way when Quasar suspends/resumes. Through use of reflection and JDK-internal unsafe operations it accesses all ThreadLocals in the current thread and swaps them with ones stored in the Fiber data structure. In essence for each thread that executes as a Fiber we have two sets of thread locals, one set belongs to the original "non-Quasar" thread, and the other belongs to the Fiber. During Fiber execution the latter is active, this is swapped with the former during suspension, and swapped back during resume. Note that during resume these thread-locals may actually be restored to a different thread than the original.

In the parkAndSerialize closure the Fiber is partially parked, and at this point the thread locals are already swapped out. This means that data stored in ThreadLocal s that we still need must be re-initialized somehow. In the above case this is the logging MDC.

// Will skip checkpoint if there are any idempotent flows in the subflow stack.
val skipPersistingCheckpoint = containsIdempotentFlows() || maySkipCheckpoint

contextTransactionOrNull = transaction.value
val event = try {
    Event.Suspend(
            ioRequest = ioRequest,
            maySkipCheckpoint = skipPersistingCheckpoint,
            fiber = this.checkpointSerialize(context = serializationContext.value)
    )
} catch (exception: Exception) {
    Event.Error(exception)
}

A couple of things happen here. First we determine whether this suspension's subflow stack contains an IdempotentFlow, to determine whether to skip checkpoints. An idempotent flow is a subflow that's safe to replay from the beginning. This means that no checkpoint will be persisted during its execution, as replaying from the previous checkpoint should yield the same results semantically. As an example the notary client flow is an IdempotentFlow, as notarisation is idempotent, and may be safely replayed.

We then set another thread-local, the database transaction, which was also swapped out during the park, and we made it available to the closure temporarily using a TransientReference earlier. The database transaction is used during serialization of the fiber and persistence of the checkpoint.

We then create the Suspend event, which includes the IO request and the serialized Fiber. If there's an exception during serialization we create an Error event instead. Note how every condition, including error conditions are treated as "normal control flow" in the state machine, we must be extra careful as these conditions are also exposed to the user and are part of our API guarantees.

// We must commit the database transaction before returning from this closure otherwise Quasar may schedule
// other fibers, so we process the event immediately
val continuation = processEventImmediately(
        event,
        isDbTransactionOpenOnEntry = true,
        isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}

We first process the suspension event ASAP, as we must commit the underlying database transaction before the closure ends.

Note

The call to processEventImmediately here reveals why the transition execution is structured in such an unintuitive way, why we are not simply using an event loop. In an earlier iteration of the flow framework a separate thread pool was handling events and state transitions, the state machine transitions' execution was completely offloaded, and the Fiber itself was only concerned with the execution of user code and creation of suspension events.

However later it turned out that under any considerable load this structuring results in heavy resource leakage, and in the case of database transactions, deadlocks. The reason for this is simply that resource management is often tied to thread lifetime, for example in the case of serialization buffers, network buffers, database connections. Quasar multiplexes threads across many many more Fibers, however this also explodes thread-bound resources allocated/retained, which are now Fiber-bound. This means that if we want to take advantage of Quasar's green threading we must make sure to release any thread-local resources before yielding, otherwise we will leak.

To give a specific example, if we processed the above Suspend event in another thread or even just after this closure, the underlying database connection would leak through a proper Fiber yield, meaning it would not be closed until the Fiber is scheduled again or until the processing thread picks it up and closes it. In the case of database transactions we use Hikari to pool the connections, which means that the flow framework would quickly exhaust the connection pool, which would thus cause a proper thread block of the Fiber-executing threads trying to acquire a connection. This in turn means there would be absolutely no chance of the fibers retaining the connections getting scheduled again, effectively deadlocking the executor threadpool.

unpark(SERIALIZER_BLOCKER)
}
return uncheckedCast(processEventsUntilFlowIsResumed(
    isDbTransactionOpenOnEntry = false,
    isDbTransactionOpenOnExit = true
))

As the last step in the park closure we unpark the Fiber we are currently parking. This effectively causes an "immediate" re-enter of the fiber, and therefore the suspend function, but this time jumping over the park and executing the next statement. Of course this re-enter may happen much later, perhaps even on a different thread.

We then enter a mini event-loop, which also does Quasar yields, processing the flow's event queue until a transition continuation indicates that control can be returned to user code . Practically this means that when a flow is waiting on an IO action it won't actually be blocked in the parkAndSerialize call, but rather in this event loop, popping from the event queue.

Note

The processEvent* calls do explicit checks of database transaction state on entry and exit. This is because Quasar yields make reasoning about resource usage difficult, as they detach resource lifetime from lexical scoping, or in fact any other scoping that programmers are used to. These internal checks ensure that we are aware of which code blocks have a transaction open and which ones don't. Incidentally these checks also seem to catch instrumentation/missing @Suspendable-annotation problems.

Event processing

The processing of an event consists of two steps:

  1. Calculating a transition. This is the pure StateMachineState + Event -> TransitionResult function.
  2. Executing the transition. This is done by a TransitionExecutor, which in turn uses an ActionExecutor for individual Actions.

This structuring allows the introspection and interception of state machine transitions through the registering of TransitionExecutor interceptors. These interceptors are TransitionExecutor s that have access to a delegate. When they receive a new transition they can inspect it, pass it to the delegate, and do something specific to the interceptor.

For example checkpoint deserializability is checked by such an interceptor. It inspects a transition, and if it contains a Fiber checkpoint then it checks whether it's deserializable in a separate thread.

The transition calculation is done in the net.corda.node.services.statemachine.transitions package, the top-level entry point being TopLevelTransition. There is a TransitionBuilder helper that makes the transition definitions a bit more readable. It contains a currentState field that may be updated with new StateMachineState instances as the event is being processed, and has some helper functions for common functionality, for example for erroring the state machine with some error condition.

Here are a couple of highlighted transitions:

Suspend

Handling of Event.Suspend is quite straightforward and is done here. We take the serialized Fiber and the IO request and create a new checkpoint, then depending on whether we should persist or not we either simply commit the database transaction and schedule a DoRemainingWork (to be explained later), or we persist the checkpoint, run the DeduplicationHandler inside-tx hooks, commit, then run the after-tx hooks, and schedule a DoRemainingWork.

Every checkpoint persistence implies the above steps, in this specific order.

DoRemainingWork

This is a generic event that simply tells the state machine: inspect your current state, and decide what to do, if anything. Using this event we can break down transitions into a <modify state> and <inspect and do stuff> transition, which compose well with other transitions, as we don't need to add special cases everywhere in the state machine.

As an example take error propagation. When a flow errors it's put into an "errored" state, and it's waiting for further instructions. One possibility is the triggering of error propagation through the scheduling of Event.StartErrorPropagation. Note how the handling of this event simply does the following:

currentState = currentState.copy(
        checkpoint = currentState.checkpoint.copy(
                errorState = errorState.copy(propagating = true)
        )
)
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))

It marks the error state as propagating = true and schedules a DoRemainingWork. The processing of that event in turn will detect that we are errored and propagating, and there are some errors that haven't been propagated yet. It then propagates those errors and updates the "propagated index" to indicate all errors have been dealt with. Subsequent DoRemainingWorks will thus do nothing. However, in case some other error condition or external event adds another error to the flow, we would automatically propagate that too, we don't need to write a special case for it.

Most of the state machine logic is therefore about the handling DoRemainingWork. Another example is resumptions due to an IO request completing in some way. DoRemainingWork checks whether we are currently waiting for something to complete e.g. a FlowIORequest.Receive. It then checks whether the state contains enough data to complete the action, in the receive case this means checking the relevant sessions for buffered messages, and seeing whether those messages are sufficient to resume the flow with.

Transition execution

Once the transition has been calculated the transition is passed to the flow's TransitionExecutor. The main executor is TransitionExecutorImpl, which executes the transition's Action s, and handles errors by manually erroring the flow's state. This is also when transition interceptors are triggered.

Errors

An error can manifest as either the whole flow erroring, or a specific session erroring. The former means that the whole flow is blocked from resumption, and it will end up in the flow hospital. A session erroring blocks only that specific session. Any interaction with this session will in turn error the flow. Session errors are created by a remote party propagating an error to our flow.

How to modify the state machine

Let's say we wanted to change the session messaging protocol. How would we go about changing the state machine?

The session logic is defined by

  1. Session message definitions, see the SessionMessage sealed class.
  2. Session state definitions, see the SessionState sealed class. This is the state we store per established/to-be-established session with a Party.
  3. Session state transitions, see DeliverSessionMessageTransition.

Let's say we wanted to add more handshake steps. To do this we need to add new types of SessionMessage s as required, new SessionState s, and cases to handle state transitions in DeliverSessionMessageTransition. This handles the receive path, to handle send paths StartedFlowTransition.sendTransition needs modifying, this is the transition triggered when the flow suspends on a send.

Atomicity

DeduplicationHandler

The flow framework guarantees atomicity of processing incoming events. This means that a flow or the node may be stopped at any time, even during processing of an event and on restart the node will reconstruct the correct state of the flows and will proceed as if nothing happened.

To do this each external event is given two hooks, one inside the database transaction committing the next checkpoint, and one after the commit, to enable implementation of exactly-once delivery on top of at-least-once. These hooks can be found on the DeduplicationHandler interface:

/**
 * This handler is used to implement exactly-once delivery of an external event on top of an at-least-once delivery. This is done
 * using two hooks that are called from the event processor, one called from the database transaction committing the
 * side-effect caused by the external event, and another one called after the transaction has committed successfully.
 *
 * For example for messaging we can use [insideDatabaseTransaction] to store the message's unique ID for later
 * deduplication, and [afterDatabaseTransaction] to acknowledge the message and stop retries.
 *
 * We also use this for exactly-once start of a scheduled flow, [insideDatabaseTransaction] is used to remove the
 * to-be-scheduled state of the flow, [afterDatabaseTransaction] is used for cleanup of in-memory bookkeeping.
 *
 * It holds a reference back to the causing external event.
 */
interface DeduplicationHandler {
    /**
     * This will be run inside a database transaction that commits the side-effect of the event, allowing the
     * implementor to persist the event delivery fact atomically with the side-effect.
     */
    fun insideDatabaseTransaction()

    /**
     * This will be run strictly after the side-effect has been committed successfully and may be used for
     * cleanup/acknowledgement/stopping of retries.
     */
    fun afterDatabaseTransaction()

    /**
     * The external event for which we are trying to reduce from at-least-once delivery to exactly-once.
     */
    val externalCause: ExternalEvent
}

Let's take message delivery as an example. From the flow framework's perspective we are assuming at least once delivery, and in order delivery. When a message is received a corresponding DeduplicationHandler is created. The hook inside the database transaction persists the message ID, and the hook after acknowledges it, stopping potential retries. If the node crashes before the transaction commits then the message will be redelivered, and if it crashes after it will be deduplicated based on the ID table.

We also use this for deduplicating scheduled flow starts, the inside hook removes the scheduled StateRef, and the after hook cleans up in-memory bookkeeping.

We could also use this for deduplicating RPC flow starts. A deduplication ID would be generated (and potentially stored) on the client, persisted on the node in the inside-tx hook, and the start would be acked afterwards, removing the ID from the client (and stopping retries).

Internally a list of pending DeduplicationHandler s is accumulated in the state machine in StateMachineState. When the next checkpoint is persisted the corresponding insideDatabaseTranscation hooks are run, and once the checkpoint is committed the afterDatabaseTransaction hooks are run.

In-memory flow retries

Tracking of these handlers also allows us to do in-memory retries of flows. To do this we need to re-create the flow from the last checkpoint and retry external events internally. For every flow we have two lists of such "events", one is the yet-unprocessed event queue of the flow, and one is the already processed but still pending list of DeduplicationHandler s. The concatenation of these events gives us a handle on the list of events relevant to the flow since the last persisted checkpoint, so we just need to re-process these events. All of these events go through the StateMachineManager, which is where the retry is handled too.

Note

There may be cases where there is no checkpoint yet for a flow that needs retrying. In this case the re-processing of the events is sufficient, as one of those events will be the starting of the flow, or a delivery of a flow initiation message. So it all works out!

Deduplication

Full message deduplication is more complex, what we've discussed so far only dealt with the state machine bits.

When we receive a message from Artemis it is eventually handled by P2PMessagingClient.deliver, which consults the P2PDeduplicator class to determine whether the message is a duplicate. P2PDeduplicator holds two data structures:

  1. processedMessages: the persisted message ID table. Any message ID in this table must have been committed together with a checkpoint that includes the side-effects caused by the message.
  2. beingProcessedMessages: an in-memory map holding the message IDs until they are being processed and committed.

These two data structures correspond to the two DeduplicationHandler hooks of each message. insideDatabaseTransaction adds to the processedMessages map, afterDatabaseTransaction removes from beingProcessedMessages

The indirection through the in-memory map is needed because Artemis may redeliver unacked messages in certain situations, and at that point the message may still be "in-flight", i.e. the ID may not be committed yet.

If the message isn't a duplicate then it's put into beingProcessedMessages and forwarded to the state machine manager, which then forwards it to the right flow or constructs one if this is an initiating message. When the next checkpoint of the relevant flow is persisted the message is "finalized" as discussed, using its DeduplicationHandler.

Flow hospital

The flow hospital is a place where errored flows end up. This is done using an interceptor that detects error transitions and notifies the hospital.

The hospital can decide what to do with the flow: restart it from the last persisted checkpoint using an in-memory retry, keep the flow around pending either manual intervention or a restart of the node (in which case it will be retried from the last persisted checkpoint on start), or trigger error propagation, which makes the error permanent and notifies other parties the flow has sessions with of the failure.

This is where we can do special logic to handle certain error conditions like notary failures in a specific way e.g. by retrying.