mirror of
https://github.com/corda/corda.git
synced 2025-02-21 01:42:24 +00:00
CORDA-3432 update structure of checkpoint class (#5983)
* Split StateMachine State into 2 classes The idea is this better reflects the database structure. Added a few helper methods to copy and update state. * Doc + Improve Checkpoint API * Rename methods to be more clear
This commit is contained in:
parent
13c52e4901
commit
8d1b6cf499
@ -52,7 +52,7 @@ object CheckpointVerifier {
|
||||
}
|
||||
|
||||
// For each Subflow, compare the checkpointed version to the current version.
|
||||
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) }
|
||||
checkpoint.checkpointState.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
// Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress
|
||||
// step for each sub-call.
|
||||
val stackObjects = fiber.getQuasarStack()
|
||||
subFlowStack.map { it.toJson(stackObjects) }
|
||||
checkpointState.subFlowStack.map { it.toJson(stackObjects) }
|
||||
} else {
|
||||
emptyList()
|
||||
}
|
||||
@ -226,9 +226,9 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
timestamp,
|
||||
now
|
||||
),
|
||||
origin = invocationContext.origin.toOrigin(),
|
||||
ourIdentity = ourIdentity,
|
||||
activeSessions = sessions.mapNotNull { it.value.toActiveSession(it.key) },
|
||||
origin = checkpointState.invocationContext.origin.toOrigin(),
|
||||
ourIdentity = checkpointState.ourIdentity,
|
||||
activeSessions = checkpointState.sessions.mapNotNull { it.value.toActiveSession(it.key) },
|
||||
errored = errorState as? ErrorState.Errored
|
||||
)
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ data class DeduplicationId(val toString: String) {
|
||||
* message-id map to change, which means deduplication will not happen correctly.
|
||||
*/
|
||||
fun createForNormal(checkpoint: Checkpoint, index: Int, session: SessionState): DeduplicationId {
|
||||
return DeduplicationId("N-${session.deduplicationSeed}-${checkpoint.numberOfSuspends}-$index")
|
||||
return DeduplicationId("N-${session.deduplicationSeed}-${checkpoint.checkpointState.numberOfSuspends}-$index")
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,8 +108,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
*/
|
||||
override val logger = log
|
||||
override val resultFuture: CordaFuture<R> get() = uncheckedCast(getTransientField(TransientValues::resultFuture))
|
||||
override val context: InvocationContext get() = transientState!!.value.checkpoint.invocationContext
|
||||
override val ourIdentity: Party get() = transientState!!.value.checkpoint.ourIdentity
|
||||
override val context: InvocationContext get() = transientState!!.value.checkpoint.checkpointState.invocationContext
|
||||
override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity
|
||||
internal var hasSoftLockedStates: Boolean = false
|
||||
set(value) {
|
||||
if (value) field = value else throw IllegalArgumentException("Can only set to true")
|
||||
@ -349,7 +349,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
*/
|
||||
@Suspendable
|
||||
private fun checkpointIfSubflowIdempotent(subFlow: Class<FlowLogic<*>>) {
|
||||
val currentFlow = snapshot().checkpoint.subFlowStack.last().flowClass
|
||||
val currentFlow = snapshot().checkpoint.checkpointState.subFlowStack.last().flowClass
|
||||
if (!currentFlow.isIdempotentFlow() && subFlow.isIdempotentFlow()) {
|
||||
suspend(FlowIORequest.ForceCheckpoint, false)
|
||||
}
|
||||
@ -453,7 +453,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
private fun containsIdempotentFlows(): Boolean {
|
||||
val subFlowStack = snapshot().checkpoint.subFlowStack
|
||||
val subFlowStack = snapshot().checkpoint.checkpointState.subFlowStack
|
||||
return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) }
|
||||
}
|
||||
|
||||
|
@ -853,9 +853,9 @@ class SingleThreadedStateMachineManager(
|
||||
private fun getFlowSessionIds(checkpoint: Checkpoint): Set<SessionId> {
|
||||
val initiatedFlowStart = (checkpoint.flowState as? FlowState.Unstarted)?.flowStart as? FlowStart.Initiated
|
||||
return if (initiatedFlowStart == null) {
|
||||
checkpoint.sessions.keys
|
||||
checkpoint.checkpointState.sessions.keys
|
||||
} else {
|
||||
checkpoint.sessions.keys + initiatedFlowStart.initiatedSessionId
|
||||
checkpoint.checkpointState.sessions.keys + initiatedFlowStart.initiatedSessionId
|
||||
}
|
||||
}
|
||||
|
||||
@ -901,7 +901,7 @@ class SingleThreadedStateMachineManager(
|
||||
// final sanity checks
|
||||
require(lastState.pendingDeduplicationHandlers.isEmpty()) { "Flow cannot be removed until all pending deduplications have completed" }
|
||||
require(lastState.isRemoved) { "Flow must be in removable state before removal" }
|
||||
require(lastState.checkpoint.subFlowStack.size == 1) { "Checkpointed stack must be empty" }
|
||||
require(lastState.checkpoint.checkpointState.subFlowStack.size == 1) { "Checkpointed stack must be empty" }
|
||||
require(flow.fiber.id !in sessionToFlow.values) { "Flow fibre must not be needed by an existing session" }
|
||||
flow.resultFuture.set(removalReason.flowReturnValue)
|
||||
lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
|
@ -232,7 +232,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
}
|
||||
}
|
||||
|
||||
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
|
||||
val numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends
|
||||
val record = MedicalRecord.Flow(time, flowFiber.id, numberOfSuspends, errors, report.by, outcome)
|
||||
medicalHistory.records += record
|
||||
recordsPublisher.onNext(record)
|
||||
Pair(event, backOffForChronicCondition)
|
||||
@ -314,7 +315,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
}
|
||||
|
||||
fun timesDischargedForTheSameThing(by: Staff, currentState: StateMachineState): Int {
|
||||
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
|
||||
val lastAdmittanceSuspendCount = currentState.checkpoint.checkpointState.numberOfSuspends
|
||||
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount }
|
||||
}
|
||||
|
||||
|
@ -46,22 +46,15 @@ data class StateMachineState(
|
||||
)
|
||||
|
||||
/**
|
||||
* @param invocationContext the initiator of the flow.
|
||||
* @param ourIdentity the identity the flow is run as.
|
||||
* @param sessions map of source session ID to session state.
|
||||
* @param subFlowStack the stack of currently executing subflows.
|
||||
* @param checkpointState the state of the checkpoint
|
||||
* @param flowState the state of the flow itself, including the frozen fiber/FlowLogic.
|
||||
* @param errorState the "dirtiness" state including the involved errors and their propagation status.
|
||||
* @param numberOfSuspends the number of flow suspends due to IO API calls.
|
||||
*/
|
||||
data class Checkpoint(
|
||||
val invocationContext: InvocationContext,
|
||||
val ourIdentity: Party,
|
||||
val sessions: SessionMap, // This must preserve the insertion order!
|
||||
val subFlowStack: List<SubFlow>,
|
||||
val checkpointState: CheckpointState,
|
||||
val flowState: FlowState,
|
||||
val errorState: ErrorState,
|
||||
val numberOfSuspends: Int
|
||||
val result: Any? = null
|
||||
) {
|
||||
|
||||
val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy.
|
||||
@ -79,19 +72,62 @@ data class Checkpoint(
|
||||
): Try<Checkpoint> {
|
||||
return SubFlow.create(flowLogicClass, subFlowVersion, isEnabledTimedFlow).map { topLevelSubFlow ->
|
||||
Checkpoint(
|
||||
invocationContext = invocationContext,
|
||||
ourIdentity = ourIdentity,
|
||||
sessions = emptyMap(),
|
||||
subFlowStack = listOf(topLevelSubFlow),
|
||||
checkpointState = CheckpointState(invocationContext, ourIdentity, emptyMap(), listOf(topLevelSubFlow), numberOfSuspends = 0),
|
||||
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
|
||||
errorState = ErrorState.Clean,
|
||||
numberOfSuspends = 0
|
||||
errorState = ErrorState.Clean
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with a new session map.
|
||||
* @param sessions the new map of session ID to session state.
|
||||
*/
|
||||
fun setSessions(sessions: SessionMap) : Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessions = sessions))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with an extra session added to the session map.
|
||||
* @param session the extra session to add.
|
||||
*/
|
||||
fun addSession(session: Pair<SessionId, SessionState>) : Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions + session))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with a new subFlow stack.
|
||||
* @param subFlows the new List of subFlows.
|
||||
*/
|
||||
fun setSubflows(subFlows: List<SubFlow>) : Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(subFlowStack = subFlows))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with an extra subflow added to the subFlow Stack.
|
||||
* @param subFlow the subFlow to add to the stack of subFlows
|
||||
*/
|
||||
fun addSubflow(subFlow: SubFlow) : Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(subFlowStack = checkpointState.subFlowStack + subFlow))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param invocationContext the initiator of the flow.
|
||||
* @param ourIdentity the identity the flow is run as.
|
||||
* @param sessions map of source session ID to session state.
|
||||
* @param subFlowStack the stack of currently executing subflows.
|
||||
* @param numberOfSuspends the number of flow suspends due to IO API calls.
|
||||
*/
|
||||
data class CheckpointState(
|
||||
val invocationContext: InvocationContext,
|
||||
val ourIdentity: Party,
|
||||
val sessions: SessionMap, // This must preserve the insertion order!
|
||||
val subFlowStack: List<SubFlow>,
|
||||
val numberOfSuspends: Int
|
||||
)
|
||||
|
||||
/**
|
||||
* The state of a session.
|
||||
*/
|
||||
@ -251,4 +287,4 @@ sealed class SubFlowVersion {
|
||||
abstract val platformVersion: Int
|
||||
data class CoreFlow(override val platformVersion: Int) : SubFlowVersion()
|
||||
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.DeclaredField
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.node.services.statemachine.Action
|
||||
import net.corda.node.services.statemachine.ConfirmSessionMessage
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
@ -48,7 +47,7 @@ class DeliverSessionMessageTransition(
|
||||
pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers + event.deduplicationHandler
|
||||
)
|
||||
// Check whether we have a session corresponding to the message.
|
||||
val existingSession = startingState.checkpoint.sessions[event.sessionMessage.recipientSessionId]
|
||||
val existingSession = startingState.checkpoint.checkpointState.sessions[event.sessionMessage.recipientSessionId]
|
||||
if (existingSession == null) {
|
||||
freshErrorTransition(CannotFindSessionException(event.sessionMessage.recipientSessionId))
|
||||
} else {
|
||||
@ -81,8 +80,8 @@ class DeliverSessionMessageTransition(
|
||||
errors = emptyList(),
|
||||
deduplicationSeed = sessionState.deduplicationSeed
|
||||
)
|
||||
val newCheckpoint = currentState.checkpoint.copy(
|
||||
sessions = currentState.checkpoint.sessions + (event.sessionMessage.recipientSessionId to initiatedSession)
|
||||
val newCheckpoint = currentState.checkpoint.addSession(
|
||||
event.sessionMessage.recipientSessionId to initiatedSession
|
||||
)
|
||||
// Send messages that were buffered pending confirmation of session.
|
||||
val sendActions = sessionState.bufferedMessages.map { (deduplicationId, bufferedMessage) ->
|
||||
@ -104,9 +103,10 @@ class DeliverSessionMessageTransition(
|
||||
val newSessionState = sessionState.copy(
|
||||
receivedMessages = sessionState.receivedMessages + message
|
||||
)
|
||||
|
||||
currentState = currentState.copy(
|
||||
checkpoint = currentState.checkpoint.copy(
|
||||
sessions = startingState.checkpoint.sessions + (event.sessionMessage.recipientSessionId to newSessionState)
|
||||
checkpoint = currentState.checkpoint.addSession(
|
||||
event.sessionMessage.recipientSessionId to newSessionState
|
||||
)
|
||||
)
|
||||
}
|
||||
@ -138,9 +138,7 @@ class DeliverSessionMessageTransition(
|
||||
val flowError = FlowError(payload.errorId, exception)
|
||||
val newSessionState = sessionState.copy(errors = sessionState.errors + flowError)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.copy(
|
||||
sessions = checkpoint.sessions + (sessionId to newSessionState)
|
||||
)
|
||||
checkpoint = checkpoint.addSession(sessionId to newSessionState)
|
||||
)
|
||||
}
|
||||
else -> freshErrorTransition(UnexpectedEventInState())
|
||||
@ -159,9 +157,7 @@ class DeliverSessionMessageTransition(
|
||||
val sessionId = event.sessionMessage.recipientSessionId
|
||||
val flowError = FlowError(payload.errorId, exception)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.copy(
|
||||
sessions = checkpoint.sessions + (sessionId to sessionState.copy(rejectionError = flowError))
|
||||
)
|
||||
checkpoint = checkpoint.addSession(sessionId to sessionState.copy(rejectionError = flowError))
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -171,7 +167,7 @@ class DeliverSessionMessageTransition(
|
||||
|
||||
private fun TransitionBuilder.endMessageTransition() {
|
||||
val sessionId = event.sessionMessage.recipientSessionId
|
||||
val sessions = currentState.checkpoint.sessions
|
||||
val sessions = currentState.checkpoint.checkpointState.sessions
|
||||
val sessionState = sessions[sessionId]
|
||||
if (sessionState == null) {
|
||||
return freshErrorTransition(CannotFindSessionException(sessionId))
|
||||
@ -180,9 +176,8 @@ class DeliverSessionMessageTransition(
|
||||
is SessionState.Initiated -> {
|
||||
val newSessionState = sessionState.copy(initiatedState = InitiatedSessionState.Ended)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = currentState.checkpoint.copy(
|
||||
sessions = sessions + (sessionId to newSessionState)
|
||||
)
|
||||
checkpoint = currentState.checkpoint.addSession(sessionId to newSessionState)
|
||||
|
||||
)
|
||||
}
|
||||
else -> {
|
||||
|
@ -40,10 +40,13 @@ class ErrorFlowTransition(
|
||||
return builder {
|
||||
// If we're errored and propagating do the actual propagation and update the index.
|
||||
if (remainingErrorsToPropagate.isNotEmpty() && errorState.propagating) {
|
||||
val (initiatedSessions, newSessions) = bufferErrorMessagesInInitiatingSessions(startingState.checkpoint.sessions, errorMessages)
|
||||
val (initiatedSessions, newSessions) = bufferErrorMessagesInInitiatingSessions(
|
||||
startingState.checkpoint.checkpointState.sessions,
|
||||
errorMessages
|
||||
)
|
||||
val newCheckpoint = startingState.checkpoint.copy(
|
||||
errorState = errorState.copy(propagatedIndex = allErrors.size),
|
||||
sessions = newSessions
|
||||
checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
|
||||
)
|
||||
currentState = currentState.copy(checkpoint = newCheckpoint)
|
||||
actions.add(Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID))
|
||||
@ -65,7 +68,7 @@ class ErrorFlowTransition(
|
||||
Action.ReleaseSoftLocks(context.id.uuid),
|
||||
Action.CommitTransaction,
|
||||
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
|
||||
Action.RemoveSessionBindings(currentState.checkpoint.sessions.keys)
|
||||
Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
|
||||
))
|
||||
|
||||
currentState = currentState.copy(
|
||||
|
@ -46,7 +46,7 @@ class StartedFlowTransition(
|
||||
|
||||
private fun waitForSessionConfirmationsTransition(): TransitionResult {
|
||||
return builder {
|
||||
if (currentState.checkpoint.sessions.values.any { it is SessionState.Initiating }) {
|
||||
if (currentState.checkpoint.checkpointState.sessions.values.any { it is SessionState.Initiating }) {
|
||||
FlowContinuation.ProcessEvents
|
||||
} else {
|
||||
resumeFlowLogic(Unit)
|
||||
@ -76,7 +76,7 @@ class StartedFlowTransition(
|
||||
val checkpoint = currentState.checkpoint
|
||||
val resultMap = LinkedHashMap<FlowSession, FlowInfo>()
|
||||
for ((sessionId, session) in sessionIdToSession) {
|
||||
val sessionState = checkpoint.sessions[sessionId]
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]
|
||||
if (sessionState is SessionState.Initiated) {
|
||||
resultMap[session] = sessionState.peerFlowInfo
|
||||
} else {
|
||||
@ -159,14 +159,14 @@ class StartedFlowTransition(
|
||||
sourceSessionIdToSessionMap: Map<SessionId, FlowSessionImpl>
|
||||
): Map<FlowSession, SerializedBytes<Any>>? {
|
||||
val checkpoint = currentState.checkpoint
|
||||
val pollResult = pollSessionMessages(checkpoint.sessions, sourceSessionIdToSessionMap.keys) ?: return null
|
||||
val pollResult = pollSessionMessages(checkpoint.checkpointState.sessions, sourceSessionIdToSessionMap.keys) ?: return null
|
||||
val resultMap = LinkedHashMap<FlowSession, SerializedBytes<Any>>()
|
||||
for ((sessionId, message) in pollResult.messages) {
|
||||
val session = sourceSessionIdToSessionMap[sessionId]!!
|
||||
resultMap[session] = message
|
||||
}
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.copy(sessions = pollResult.newSessionMap)
|
||||
checkpoint = checkpoint.setSessions(sessions = pollResult.newSessionMap)
|
||||
)
|
||||
return resultMap
|
||||
}
|
||||
@ -205,10 +205,10 @@ class StartedFlowTransition(
|
||||
|
||||
private fun TransitionBuilder.sendInitialSessionMessagesIfNeeded(sourceSessions: Set<SessionId>) {
|
||||
val checkpoint = startingState.checkpoint
|
||||
val newSessions = LinkedHashMap<SessionId, SessionState>(checkpoint.sessions)
|
||||
val newSessions = LinkedHashMap<SessionId, SessionState>(checkpoint.checkpointState.sessions)
|
||||
var index = 0
|
||||
for (sourceSessionId in sourceSessions) {
|
||||
val sessionState = checkpoint.sessions[sourceSessionId]
|
||||
val sessionState = checkpoint.checkpointState.sessions[sourceSessionId]
|
||||
if (sessionState == null) {
|
||||
return freshErrorTransition(CannotFindSessionException(sourceSessionId))
|
||||
}
|
||||
@ -225,7 +225,7 @@ class StartedFlowTransition(
|
||||
actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
|
||||
newSessions[sourceSessionId] = newSessionState
|
||||
}
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.setSessions(sessions = newSessions))
|
||||
}
|
||||
|
||||
private fun sendTransition(flowIORequest: FlowIORequest.Send): TransitionResult {
|
||||
@ -244,10 +244,10 @@ class StartedFlowTransition(
|
||||
|
||||
private fun TransitionBuilder.sendToSessionsTransition(sourceSessionIdToMessage: Map<SessionId, SerializedBytes<Any>>) {
|
||||
val checkpoint = startingState.checkpoint
|
||||
val newSessions = LinkedHashMap(checkpoint.sessions)
|
||||
val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions)
|
||||
var index = 0
|
||||
for ((sourceSessionId, message) in sourceSessionIdToMessage) {
|
||||
val existingSessionState = checkpoint.sessions[sourceSessionId]
|
||||
val existingSessionState = checkpoint.checkpointState.sessions[sourceSessionId]
|
||||
if (existingSessionState == null) {
|
||||
return freshErrorTransition(CannotFindSessionException(sourceSessionId))
|
||||
} else {
|
||||
@ -286,7 +286,7 @@ class StartedFlowTransition(
|
||||
}
|
||||
|
||||
}
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.setSessions(newSessions))
|
||||
}
|
||||
|
||||
private fun sessionToSessionId(session: FlowSession): SessionId {
|
||||
@ -295,7 +295,7 @@ class StartedFlowTransition(
|
||||
|
||||
private fun collectErroredSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.flatMap { sessionId ->
|
||||
val sessionState = checkpoint.sessions[sessionId]!!
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Uninitiated -> emptyList()
|
||||
is SessionState.Initiating -> {
|
||||
@ -311,14 +311,14 @@ class StartedFlowTransition(
|
||||
}
|
||||
|
||||
private fun collectErroredInitiatingSessionErrors(checkpoint: Checkpoint): List<Throwable> {
|
||||
return checkpoint.sessions.values.mapNotNull { sessionState ->
|
||||
return checkpoint.checkpointState.sessions.values.mapNotNull { sessionState ->
|
||||
(sessionState as? SessionState.Initiating)?.rejectionError?.exception
|
||||
}
|
||||
}
|
||||
|
||||
private fun collectEndedSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.mapNotNull { sessionId ->
|
||||
val sessionState = checkpoint.sessions[sessionId]!!
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
if (sessionState.initiatedState === InitiatedSessionState.Ended) {
|
||||
@ -338,7 +338,7 @@ class StartedFlowTransition(
|
||||
|
||||
private fun collectEndedEmptySessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.mapNotNull { sessionId ->
|
||||
val sessionState = checkpoint.sessions[sessionId]!!
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
if (sessionState.initiatedState === InitiatedSessionState.Ended &&
|
||||
@ -372,7 +372,7 @@ class StartedFlowTransition(
|
||||
collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedSessionErrors(sessionIds, checkpoint)
|
||||
}
|
||||
is FlowIORequest.WaitForLedgerCommit -> {
|
||||
collectErroredSessionErrors(checkpoint.sessions.keys, checkpoint)
|
||||
collectErroredSessionErrors(checkpoint.checkpointState.sessions.keys, checkpoint)
|
||||
}
|
||||
is FlowIORequest.GetFlowInfo -> {
|
||||
collectErroredSessionErrors(flowIORequest.sessions.map(this::sessionToSessionId), checkpoint)
|
||||
@ -413,7 +413,7 @@ class StartedFlowTransition(
|
||||
return builder {
|
||||
// The `numberOfSuspends` is added to the deduplication ID in case an async
|
||||
// operation is executed multiple times within the same flow.
|
||||
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
|
||||
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString()
|
||||
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
|
||||
FlowContinuation.ProcessEvents
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ class TopLevelTransition(
|
||||
return TransitionResult(
|
||||
newState = lastState,
|
||||
actions = listOf(
|
||||
Action.RemoveSessionBindings(startingState.checkpoint.sessions.keys),
|
||||
Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys),
|
||||
Action.RemoveFlow(context.id, FlowRemovalReason.SoftShutdown, lastState)
|
||||
),
|
||||
continuation = FlowContinuation.Abort
|
||||
@ -111,11 +111,9 @@ class TopLevelTransition(
|
||||
val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion, event.isEnabledTimedFlow)
|
||||
when (subFlow) {
|
||||
is Try.Success -> {
|
||||
val containsTimedSubflow = containsTimedFlows(currentState.checkpoint.subFlowStack)
|
||||
val containsTimedSubflow = containsTimedFlows(currentState.checkpoint.checkpointState.subFlowStack)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = currentState.checkpoint.copy(
|
||||
subFlowStack = currentState.checkpoint.subFlowStack + subFlow.value
|
||||
)
|
||||
checkpoint = currentState.checkpoint.addSubflow(subFlow.value)
|
||||
)
|
||||
// We don't schedule a timeout if there already is a timed subflow on the stack - a timeout had
|
||||
// been scheduled already.
|
||||
@ -134,17 +132,15 @@ class TopLevelTransition(
|
||||
private fun leaveSubFlowTransition(): TransitionResult {
|
||||
return builder {
|
||||
val checkpoint = currentState.checkpoint
|
||||
if (checkpoint.subFlowStack.isEmpty()) {
|
||||
if (checkpoint.checkpointState.subFlowStack.isEmpty()) {
|
||||
freshErrorTransition(UnexpectedEventInState())
|
||||
} else {
|
||||
val isLastSubFlowTimed = checkpoint.subFlowStack.last().isEnabledTimedFlow
|
||||
val newSubFlowStack = checkpoint.subFlowStack.dropLast(1)
|
||||
val isLastSubFlowTimed = checkpoint.checkpointState.subFlowStack.last().isEnabledTimedFlow
|
||||
val newSubFlowStack = checkpoint.checkpointState.subFlowStack.dropLast(1)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.copy(
|
||||
subFlowStack = newSubFlowStack
|
||||
)
|
||||
checkpoint = checkpoint.setSubflows(newSubFlowStack)
|
||||
)
|
||||
if (isLastSubFlowTimed && !containsTimedFlows(currentState.checkpoint.subFlowStack)) {
|
||||
if (isLastSubFlowTimed && !containsTimedFlows(currentState.checkpoint.checkpointState.subFlowStack)) {
|
||||
actions.add(Action.CancelFlowTimeout(currentState.flowLogic.runId))
|
||||
}
|
||||
}
|
||||
@ -160,7 +156,9 @@ class TopLevelTransition(
|
||||
return builder {
|
||||
val newCheckpoint = currentState.checkpoint.copy(
|
||||
flowState = FlowState.Started(event.ioRequest, event.fiber),
|
||||
numberOfSuspends = currentState.checkpoint.numberOfSuspends + 1
|
||||
checkpointState = currentState.checkpoint.checkpointState.copy(
|
||||
numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends + 1
|
||||
)
|
||||
)
|
||||
if (event.maySkipCheckpoint) {
|
||||
actions.addAll(arrayOf(
|
||||
@ -198,13 +196,14 @@ class TopLevelTransition(
|
||||
val pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.copy(
|
||||
numberOfSuspends = checkpoint.numberOfSuspends + 1
|
||||
),
|
||||
checkpointState = checkpoint.checkpointState.copy(
|
||||
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
|
||||
)),
|
||||
pendingDeduplicationHandlers = emptyList(),
|
||||
isFlowResumed = false,
|
||||
isRemoved = true
|
||||
)
|
||||
val allSourceSessionIds = checkpoint.sessions.keys
|
||||
val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
|
||||
if (currentState.isAnyCheckpointPersisted) {
|
||||
actions.add(Action.RemoveCheckpoint(context.id))
|
||||
}
|
||||
@ -230,7 +229,7 @@ class TopLevelTransition(
|
||||
}
|
||||
|
||||
private fun TransitionBuilder.sendEndMessages() {
|
||||
val sendEndMessageActions = currentState.checkpoint.sessions.values.mapIndexed { index, state ->
|
||||
val sendEndMessageActions = currentState.checkpoint.checkpointState.sessions.values.mapIndexed { index, state ->
|
||||
if (state is SessionState.Initiated && state.initiatedState is InitiatedSessionState.Live) {
|
||||
val message = ExistingSessionMessage(state.initiatedState.peerSinkSessionId, EndSessionMessage)
|
||||
val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state)
|
||||
@ -252,15 +251,15 @@ class TopLevelTransition(
|
||||
}
|
||||
val sourceSessionId = SessionId.createRandom(context.secureRandom)
|
||||
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
|
||||
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
val newSessions = checkpoint.checkpointState.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.setSessions(newSessions))
|
||||
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
|
||||
FlowContinuation.Resume(sessionImpl)
|
||||
}
|
||||
}
|
||||
|
||||
private fun getClosestAncestorInitiatingSubFlow(checkpoint: Checkpoint): SubFlow.Initiating? {
|
||||
for (subFlow in checkpoint.subFlowStack.asReversed()) {
|
||||
for (subFlow in checkpoint.checkpointState.subFlowStack.asReversed()) {
|
||||
if (subFlow is SubFlow.Initiating) {
|
||||
return subFlow
|
||||
}
|
||||
|
@ -61,9 +61,7 @@ class UnstartedFlowTransition(
|
||||
val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo)
|
||||
val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = currentState.checkpoint.copy(
|
||||
sessions = mapOf(flowStart.initiatedSessionId to initiatedState)
|
||||
)
|
||||
checkpoint = currentState.checkpoint.setSessions(mapOf(flowStart.initiatedSessionId to initiatedState))
|
||||
)
|
||||
actions.add(
|
||||
Action.SendExisting(
|
||||
|
Loading…
x
Reference in New Issue
Block a user