ENT-5397 Pause individual running and hospitalised flows (#3564)

Added a newpause event to the statemachine which returns an Abort
continuation and causes the flow to be moved into the Paused flow Map.

Flows can receive session messages whilst paused.
This commit is contained in:
Will Vigor 2020-08-05 15:39:23 +01:00
parent bbf5a93761
commit 4a828fcb99
8 changed files with 159 additions and 38 deletions

View File

@ -57,6 +57,11 @@ sealed class Action {
*/
data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint, val isCheckpointUpdate: Boolean) : Action()
/**
* Update only the [status] of the checkpoint with [id].
*/
data class UpdateFlowStatus(val id: StateMachineRunId, val status: Checkpoint.FlowStatus): Action()
/**
* Remove the checkpoint corresponding to [id].
*/
@ -106,6 +111,11 @@ sealed class Action {
val lastState: StateMachineState
) : Action()
/**
* Move the flow corresponding to [flowId] to paused.
*/
data class MoveFlowToPaused(val currentState: StateMachineState) : Action()
/**
* Schedule [event] to self.
*/

View File

@ -67,6 +67,8 @@ internal class ActionExecutorImpl(
is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
is Action.ScheduleFlowTimeout -> scheduleFlowTimeout(action)
is Action.CancelFlowTimeout -> cancelFlowTimeout(action)
is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action)
is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action)
}
}
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
@ -99,6 +101,11 @@ internal class ActionExecutorImpl(
}
}
@Suspendable
private fun executeUpdateFlowStatus(action: Action.UpdateFlowStatus) {
checkpointStorage.updateStatus(action.id, action.status)
}
@Suspendable
private fun executePersistDeduplicationIds(action: Action.PersistDeduplicationFacts) {
for (handle in action.deduplicationHandlers) {
@ -191,6 +198,11 @@ internal class ActionExecutorImpl(
stateMachineManager.removeFlow(action.flowId, action.removalReason, action.lastState)
}
@Suspendable
private fun executeMoveFlowToPaused(action: Action.MoveFlowToPaused) {
stateMachineManager.moveFlowToPaused(action.currentState)
}
@Suspendable
@Throws(SQLException::class)
private fun executeCreateTransaction() {

View File

@ -186,6 +186,13 @@ sealed class Event {
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/**
* Pause the flow.
*/
object Pause: Event() {
override fun toString() = "Pause"
}
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -29,11 +29,11 @@ import java.util.concurrent.Semaphore
class Flow<A>(val fiber: FlowStateMachineImpl<A>, val resultFuture: OpenFuture<Any?>)
class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint) {
val externalEvents = mutableListOf<Event.DeliverSessionMessage>()
class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint, val resultFuture: OpenFuture<Any?> = openFuture()) {
val events = mutableListOf<ExternalEvent>()
fun addExternalEvent(message: Event.DeliverSessionMessage) {
externalEvents.add(message)
fun addExternalEvent(message: ExternalEvent) {
events.add(message)
}
}
@ -66,18 +66,18 @@ class FlowCreator(
}
else -> nonResidentFlow.checkpoint
}
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint)
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
}
fun createFlowFromCheckpoint(
runId: StateMachineRunId,
oldCheckpoint: Checkpoint,
reloadCheckpointAfterSuspendCount: Int? = null,
lock: Semaphore = Semaphore(1)
lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture()
): Flow<*>? {
val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null
val resultFuture = openFuture<Any?>()
fiber.logic.stateMachine = fiber
verifyFlowLogicIsSuspendable(fiber.logic)
fiber.transientValues = createTransientValues(runId, resultFuture)
@ -219,4 +219,4 @@ class FlowCreator(
lock = lock
)
}
}
}

View File

@ -3,6 +3,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.instrument.JavaAgent
import co.paralleluniverse.strands.channels.Channel
import com.codahale.metrics.Gauge
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.concurrent.CordaFuture
@ -404,22 +405,22 @@ internal class SingleThreadedStateMachineManager(
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
if (currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED) {
database.transaction {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
}
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val checkpoint = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return@transaction null
}
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction null
} ?: return
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(
flowId,
@ -443,17 +444,56 @@ internal class SingleThreadedStateMachineManager(
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState)
}
}
/**
* Extract all the [ExternalEvent] from this flows event queue and queue them (in the correct order) in the PausedFlow.
* This differs from [extractAndScheduleEventsForRetry] which also extracts (and schedules) [Event.Pause]. This means that if there are
* more events in the flows eventQueue then the flow won't pause again (after it is retried). These events are then scheduled (along
* with any [ExistingSessionMessage] which arrive in the interim) when the flow is retried.
*/
private fun extractAndQueueExternalEventsForPausedFlow(
currentEventQueue: Channel<Event>,
currentPendingDeduplicationHandlers: List<DeduplicationHandler>,
pausedFlow: NonResidentFlow
) {
pausedFlow.events += currentPendingDeduplicationHandlers.map{it.externalCause}
do {
val event = currentEventQueue.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
pausedFlow.events.add(event.deduplicationHandler.externalCause)
}
} while (event != null)
}
/**
* Extract all the incomplete deduplication handlers as well as the [ExternalEvent] and [Event.Pause] events from this flows event queue
* [oldEventQueue]. Then schedule them (in the same order) for the new flow. This means that if a retried flow has a pause event
* scheduled then the retried flow will eventually pause. The new flow will not retry again if future retry events have been scheduled.
* When this method is called this flow must have been replaced by the new flow in [StateMachineInnerState.flows]. This method differs
* from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled straight away.
*/
private fun extractAndScheduleEventsForRetry(oldEventQueue: Channel<Event>, currentState: StateMachineState) {
val flow = innerState.withLock {
flows[currentState.flowLogic.runId]
}
val events = mutableListOf<Event>()
do {
val event = oldEventQueue.tryReceive()
if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event)
} while (event != null)
for (externalEvent in currentState.pendingDeduplicationHandlers) {
deliverExternalEvent(externalEvent.externalCause)
}
for (event in events) {
if (event is Event.GeneratedByExternalEvent) {
deliverExternalEvent(event.deduplicationHandler.externalCause)
} else {
flow?.fiber?.scheduleEvent(event)
}
}
}
@ -492,7 +532,7 @@ internal class SingleThreadedStateMachineManager(
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
if (sender != null) {
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender)
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender, event)
is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event)
}
} else {
@ -502,8 +542,13 @@ internal class SingleThreadedStateMachineManager(
}
}
private fun onExistingSessionMessage(sessionMessage: ExistingSessionMessage, deduplicationHandler: DeduplicationHandler, sender: Party) {
private fun onExistingSessionMessage(
sessionMessage: ExistingSessionMessage,
sender: Party,
externalEvent: ExternalEvent.ExternalMessageEvent
) {
try {
val deduplicationHandler = externalEvent.deduplicationHandler
val recipientId = sessionMessage.recipientSessionId
val flowId = sessionToFlow[recipientId]
if (flowId == null) {
@ -522,7 +567,7 @@ internal class SingleThreadedStateMachineManager(
innerState.withLock {
flows[flowId]?.run { fiber.scheduleEvent(event) }
// If flow is not running add it to the list of external events to be processed if/when the flow resumes.
?: pausedFlows[flowId]?.run { addExternalEvent(event) }
?: pausedFlows[flowId]?.run { addExternalEvent(externalEvent) }
?: logger.info("Cannot find fiber corresponding to flow ID $flowId")
}
}
@ -639,7 +684,16 @@ internal class SingleThreadedStateMachineManager(
null
}
val flow = flowCreator.createFlowFromLogic(flowId, invocationContext, flowLogic, flowStart, ourIdentity, existingCheckpoint, deduplicationHandler, ourSenderUUID)
val flow = flowCreator.createFlowFromLogic(
flowId,
invocationContext,
flowLogic,
flowStart,
ourIdentity,
existingCheckpoint,
deduplicationHandler,
ourSenderUUID
)
val startedFuture = openFuture<Unit>()
innerState.withLock {
startedFutures[flowId] = startedFuture
@ -657,6 +711,26 @@ internal class SingleThreadedStateMachineManager(
flowTimeoutScheduler.cancel(flowId)
}
override fun moveFlowToPaused(currentState: StateMachineState) {
currentState.cancelFutureIfRunning()
flowTimeoutScheduler.cancel(currentState.flowLogic.runId)
innerState.withLock {
val id = currentState.flowLogic.runId
val flow = flows.remove(id)
if (flow != null) {
decrementLiveFibers()
//Setting flowState = FlowState.Paused means we don't hold the frozen fiber in memory.
val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused)
val pausedFlow = NonResidentFlow(id, checkpoint, flow.resultFuture)
val eventQueue = flow.fiber.transientValues.eventQueue
extractAndQueueExternalEventsForPausedFlow(eventQueue, currentState.pendingDeduplicationHandlers, pausedFlow)
pausedFlows.put(id, pausedFlow)
} else {
logger.warn("Flow $id already removed before pausing")
}
}
}
private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? {
return try {
serializedCheckpoint.deserialize(checkpointSerializationContext)

View File

@ -106,6 +106,7 @@ internal interface StateMachineManagerInternal {
fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId)
fun removeSessionBindings(sessionIds: Set<SessionId>)
fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState)
fun moveFlowToPaused(currentState: StateMachineState)
fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId)

View File

@ -63,6 +63,7 @@ class TopLevelTransition(
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
is Event.Pause -> pausedFlowTransition()
}
}
@ -368,6 +369,22 @@ class TopLevelTransition(
}
}
private fun pausedFlowTransition(): TransitionResult {
return builder {
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
}
actions.addAll(
arrayOf(
Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED),
Action.CommitTransaction,
Action.MoveFlowToPaused(currentState)
)
)
FlowContinuation.Abort
}
}
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
return builder {
val sessions = event.sessions

View File

@ -799,8 +799,8 @@ class DBCheckpointStorageTests {
val (extractedId, extractedCheckpoint) = checkpointStorage.getPausedCheckpoints().toList().single()
assertEquals(id, extractedId)
//We don't extract the result or the flowstate from a paused checkpoint
assertEquals(null, extractedCheckpoint.serializedFlowState)
assertEquals(null, extractedCheckpoint.result)
assertNull(extractedCheckpoint.serializedFlowState)
assertNull(extractedCheckpoint.result)
assertEquals(pausedCheckpoint.status, extractedCheckpoint.status)
assertEquals(pausedCheckpoint.progressStep, extractedCheckpoint.progressStep)