Consolidate suspension time field with existing one from DB

This commit is contained in:
Dimos Raptis 2020-10-01 11:58:15 +01:00
parent 40766183a3
commit b662b95574
7 changed files with 44 additions and 40 deletions

View File

@ -377,7 +377,6 @@ class DBCheckpointStorage(
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
) {
val now = clock.instant()
val flowId = id.uuid.toString()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
@ -386,14 +385,14 @@ class DBCheckpointStorage(
flowId,
serializedCheckpointState,
serializedFlowState,
now
checkpoint.lastModificationTime
)
val metadata = createDBFlowMetadata(flowId, checkpoint, now)
val metadata = createDBFlowMetadata(flowId, checkpoint, checkpoint.lastModificationTime)
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
errored?.run { createDBFlowException(flowId, errors.last().exception, checkpoint.lastModificationTime) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
@ -410,7 +409,7 @@ class DBCheckpointStorage(
compatible = checkpoint.compatible,
progressStep = null,
ioRequestType = null,
checkpointInstant = now
checkpointInstant = checkpoint.lastModificationTime
)
currentDBSession().save(dbFlowCheckpoint)
@ -426,7 +425,6 @@ class DBCheckpointStorage(
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
) {
val now = clock.instant()
val flowId = id.uuid.toString()
val blob = if (checkpoint.status == FlowStatus.HOSPITALIZED) {
@ -445,13 +443,13 @@ class DBCheckpointStorage(
flowId,
serializedCheckpointState,
serializedFlowState,
now
checkpoint.lastModificationTime
)
}
val dbFlowResult = if (checkpoint.status == FlowStatus.COMPLETED) {
try {
createDBFlowResult(flowId, checkpoint.result, now)
createDBFlowResult(flowId, checkpoint.result, checkpoint.lastModificationTime)
} catch (e: MissingSerializerException) {
throw ResultSerializationException(e)
}
@ -461,7 +459,7 @@ class DBCheckpointStorage(
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
errored?.run { createDBFlowException(flowId, errors.last().exception, checkpoint.lastModificationTime) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
@ -478,7 +476,7 @@ class DBCheckpointStorage(
compatible = checkpoint.compatible,
progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH),
ioRequestType = checkpoint.flowIoRequest,
checkpointInstant = now
checkpointInstant = checkpoint.lastModificationTime
)
currentDBSession().update(dbFlowCheckpoint)
@ -486,7 +484,7 @@ class DBCheckpointStorage(
dbFlowResult?.let { currentDBSession().save(it) }
dbFlowException?.let { currentDBSession().save(it) }
if (checkpoint.isFinished()) {
setDBFlowMetadataFinishTime(flowId, now)
setDBFlowMetadataFinishTime(flowId, checkpoint.lastModificationTime)
}
}
@ -559,7 +557,7 @@ class DBCheckpointStorage(
override fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id)
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id, checkpoint.checkpointInstant)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id
left outer join ${DBFlowException::class.java.name} exception on checkpoint.exceptionDetails = exception.id
@ -728,7 +726,8 @@ class DBCheckpointStorage(
status = status,
progressStep = progressStep,
flowIoRequest = ioRequestType,
compatible = compatible
compatible = compatible,
lastModificationTime = blob!!.persistedInstant
)
}
@ -739,7 +738,8 @@ class DBCheckpointStorage(
val progressStep: String?,
val ioRequestType: String?,
val compatible: Boolean,
exception: String?
exception: String?,
val persistedInstant: Instant
) {
val wasHospitalized = exception != null
fun toSerializedCheckpoint(): Checkpoint.Serialized {
@ -752,7 +752,8 @@ class DBCheckpointStorage(
status = status,
progressStep = progressStep,
flowIoRequest = ioRequestType,
compatible = compatible
compatible = compatible,
lastModificationTime = persistedInstant
)
}
}

View File

@ -94,6 +94,7 @@ data class StateMachineState(
* @param checkpointState the state of the checkpoint
* @param flowState the state of the flow itself, including the frozen fiber/FlowLogic.
* @param errorState the "dirtiness" state including the involved errors and their propagation status.
* @param lastModificationTime the time this checkpoint was last modified, which also corresponds conceptually to the last suspension point.
*/
data class Checkpoint(
val checkpointState: CheckpointState,
@ -103,7 +104,8 @@ data class Checkpoint(
val status: FlowStatus = FlowStatus.RUNNABLE,
val progressStep: String? = null,
val flowIoRequest: String? = null,
val compatible: Boolean = true
val compatible: Boolean = true,
val lastModificationTime: Instant
) {
@CordaSerializable
enum class FlowStatus {
@ -144,11 +146,11 @@ data class Checkpoint(
listOf(topLevelSubFlow),
numberOfSuspends = 0,
// We set this to 1 here to avoid an extra copy and increment in UnstartedFlowTransition.createInitialCheckpoint
numberOfCommits = 1,
suspensionTime = timestamp
numberOfCommits = 1
),
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean
errorState = ErrorState.Clean,
lastModificationTime = timestamp
)
}
}
@ -217,7 +219,8 @@ data class Checkpoint(
val status: FlowStatus,
val progressStep: String?,
val flowIoRequest: String?,
val compatible: Boolean
val compatible: Boolean,
val lastModificationTime: Instant
) {
/**
* Deserializes the serialized fields contained in [Checkpoint.Serialized].
@ -238,7 +241,8 @@ data class Checkpoint(
status = status,
progressStep = progressStep,
flowIoRequest = flowIoRequest,
compatible = compatible
compatible = compatible,
lastModificationTime = lastModificationTime
)
}
}
@ -252,7 +256,6 @@ data class Checkpoint(
* @param subFlowStack The stack of currently executing subflows.
* @param numberOfSuspends The number of flow suspends due to IO API calls.
* @param numberOfCommits The number of times this checkpoint has been persisted.
* @param suspensionTime the time of the last suspension. This is supposed to be used as a stable timestamp in case of replays.
*/
@CordaSerializable
data class CheckpointState(
@ -262,8 +265,7 @@ data class CheckpointState(
val sessionsToBeClosed: Set<SessionId>,
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int,
val numberOfCommits: Int,
val suspensionTime: Instant
val numberOfCommits: Int
)
/**

View File

@ -138,7 +138,7 @@ class ErrorFlowTransition(
if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) {
val errorsWithId = errorMessages.mapIndexed { idx, errorMsg ->
val messageId = MessageIdentifier(MessageType.SESSION_ERROR, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(),
sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.checkpointState.suspensionTime)
sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.lastModificationTime)
messageId to errorMsg
}
@ -148,7 +148,7 @@ class ErrorFlowTransition(
else if (sessionState is SessionState.Initiated && !sessionState.otherSideErrored) {
val errorsWithId = errorMessages.mapIndexed { idx, errorMsg ->
val messageId = MessageIdentifier(MessageType.SESSION_ERROR, sessionState.shardId, sessionState.peerSinkSessionId,
sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.checkpointState.suspensionTime)
sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.lastModificationTime)
messageId to errorMsg
}.toList()

View File

@ -184,7 +184,7 @@ class StartedFlowTransition(
val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId
val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage)
val messageType = MessageType.inferFromMessage(message)
val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.lastModificationTime)
Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID))
}
val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) ->
@ -303,7 +303,7 @@ class StartedFlowTransition(
lastSenderSeqNo = null
)
val messageType = MessageType.inferFromMessage(initialMessage)
val messageIdentifier = MessageIdentifier(messageType, shardId, counterpartySessionId, 0, checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, shardId, counterpartySessionId, 0, checkpoint.lastModificationTime)
actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID)))
newSessions[sourceSessionId] = newSessionState
}
@ -360,14 +360,14 @@ class StartedFlowTransition(
)
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, message)
val messageType = MessageType.inferFromMessage(initialMessage)
val messageIdentifier = MessageIdentifier(messageType, shardId, sourceSessionId.calculateInitiatedSessionId(), 0, checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, shardId, sourceSessionId.calculateInitiatedSessionId(), 0, checkpoint.lastModificationTime)
Action.SendInitial(uninitiatedSessionState.destination, initialMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID))
}
} ?: emptyList()
messagesByType[SessionState.Initiating::class]?.forEach { (sourceSessionId, sessionState, message) ->
val initiatingSessionState = sessionState as SessionState.Initiating
val sessionMessage = DataSessionMessage(message)
val messageIdentifier = MessageIdentifier(MessageType.DATA_MESSAGE, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(), sessionState.nextSendingSeqNumber, checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(MessageType.DATA_MESSAGE, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(), sessionState.nextSendingSeqNumber, checkpoint.lastModificationTime)
newSessions[sourceSessionId] = initiatingSessionState.bufferMessage(messageIdentifier, sessionMessage)
}
val sendExistingActions = messagesByType[SessionState.Initiated::class]?.map {(sourceSessionId, sessionState, message) ->
@ -376,7 +376,7 @@ class StartedFlowTransition(
val sinkSessionId = initiatedSessionState.peerSinkSessionId
val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage)
val messageType = MessageType.inferFromMessage(existingMessage)
val messageIdentifier = MessageIdentifier(messageType, sessionState.shardId, sessionState.peerSinkSessionId, sessionState.nextSendingSeqNumber, checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, sessionState.shardId, sessionState.peerSinkSessionId, sessionState.nextSendingSeqNumber, checkpoint.lastModificationTime)
newSessions[sourceSessionId] = initiatedSessionState.copy(nextSendingSeqNumber = initiatedSessionState.nextSendingSeqNumber + 1)
Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID))
} ?: emptyList()

View File

@ -198,14 +198,14 @@ class TopLevelTransition(
checkpointState.invocationContext
},
numberOfSuspends = checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpointState.numberOfCommits + 1,
suspensionTime = context.time
numberOfCommits = checkpointState.numberOfCommits + 1
)
copy(
flowState = FlowState.Started(event.ioRequest, event.fiber),
checkpointState = newCheckpointState,
flowIoRequest = event.ioRequest::class.java.simpleName,
progressStep = event.progressStep?.label
progressStep = event.progressStep?.label,
lastModificationTime = context.time
)
}
if (event.maySkipCheckpoint) {
@ -244,12 +244,12 @@ class TopLevelTransition(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1,
suspensionTime = context.time
numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1
),
flowState = FlowState.Finished,
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
status = Checkpoint.FlowStatus.COMPLETED,
lastModificationTime = context.time
).removeSessions(checkpoint.checkpointState.sessions.keys),
closedSessionsPendingToBeSignalled = emptyMap(),
pendingDeduplicationHandlers = emptyList(),
@ -299,7 +299,7 @@ class TopLevelTransition(
if (state is SessionState.Initiated) {
val message = ExistingSessionMessage(state.peerSinkSessionId, EndSessionMessage)
val messageType = MessageType.inferFromMessage(message)
val messageIdentifier = MessageIdentifier(messageType, state.shardId, state.peerSinkSessionId, state.nextSendingSeqNumber, startingState.checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, state.shardId, state.peerSinkSessionId, state.nextSendingSeqNumber, startingState.checkpoint.lastModificationTime)
Action.SendExisting(state.peerParty, message, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID))
} else {
null

View File

@ -69,7 +69,7 @@ class UnstartedFlowTransition(
val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo)
val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage)
val messageType = MessageType.inferFromMessage(sessionMessage)
val messageIdentifier = MessageIdentifier(messageType, flowStart.shardIdentifier, initiatingMessage.initiatorSessionId, 0, currentState.checkpoint.checkpointState.suspensionTime)
val messageIdentifier = MessageIdentifier(messageType, flowStart.shardIdentifier, initiatingMessage.initiatorSessionId, 0, currentState.checkpoint.lastModificationTime)
currentState = currentState.copy(checkpoint = currentState.checkpoint.setSessions(mapOf(flowStart.initiatedSessionId to initiatedState)))
actions.add(Action.SendExisting(flowStart.peerSession.counterparty, sessionMessage, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID)))
}

View File

@ -130,7 +130,8 @@ class DBCheckpointStorageTests {
logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
),
progressStep = "I have made progress",
flowIoRequest = FlowIORequest.SendAndReceive::class.java.simpleName
flowIoRequest = FlowIORequest.SendAndReceive::class.java.simpleName,
lastModificationTime = Clock.systemUTC().instant()
)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {