From b662b955748625aec08d1d898fdc111d01aa9c6f Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Thu, 1 Oct 2020 11:58:15 +0100 Subject: [PATCH] Consolidate suspension time field with existing one from DB --- .../persistence/DBCheckpointStorage.kt | 31 ++++++++++--------- .../statemachine/StateMachineState.kt | 20 ++++++------ .../transitions/ErrorFlowTransition.kt | 4 +-- .../transitions/StartedFlowTransition.kt | 10 +++--- .../transitions/TopLevelTransition.kt | 14 ++++----- .../transitions/UnstartedFlowTransition.kt | 2 +- .../persistence/DBCheckpointStorageTests.kt | 3 +- 7 files changed, 44 insertions(+), 40 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index a3ccbf018c..b7f3e32f1a 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -377,7 +377,6 @@ class DBCheckpointStorage( serializedFlowState: SerializedBytes?, serializedCheckpointState: SerializedBytes ) { - val now = clock.instant() val flowId = id.uuid.toString() checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) @@ -386,14 +385,14 @@ class DBCheckpointStorage( flowId, serializedCheckpointState, serializedFlowState, - now + checkpoint.lastModificationTime ) - val metadata = createDBFlowMetadata(flowId, checkpoint, now) + val metadata = createDBFlowMetadata(flowId, checkpoint, checkpoint.lastModificationTime) val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val errored = checkpoint.errorState as? ErrorState.Errored - errored?.run { createDBFlowException(flowId, errors.last().exception, now) } + errored?.run { createDBFlowException(flowId, errors.last().exception, checkpoint.lastModificationTime) } ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") } else { null @@ -410,7 +409,7 @@ class DBCheckpointStorage( compatible = checkpoint.compatible, progressStep = null, ioRequestType = null, - checkpointInstant = now + checkpointInstant = checkpoint.lastModificationTime ) currentDBSession().save(dbFlowCheckpoint) @@ -426,7 +425,6 @@ class DBCheckpointStorage( serializedFlowState: SerializedBytes?, serializedCheckpointState: SerializedBytes ) { - val now = clock.instant() val flowId = id.uuid.toString() val blob = if (checkpoint.status == FlowStatus.HOSPITALIZED) { @@ -445,13 +443,13 @@ class DBCheckpointStorage( flowId, serializedCheckpointState, serializedFlowState, - now + checkpoint.lastModificationTime ) } val dbFlowResult = if (checkpoint.status == FlowStatus.COMPLETED) { try { - createDBFlowResult(flowId, checkpoint.result, now) + createDBFlowResult(flowId, checkpoint.result, checkpoint.lastModificationTime) } catch (e: MissingSerializerException) { throw ResultSerializationException(e) } @@ -461,7 +459,7 @@ class DBCheckpointStorage( val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val errored = checkpoint.errorState as? ErrorState.Errored - errored?.run { createDBFlowException(flowId, errors.last().exception, now) } + errored?.run { createDBFlowException(flowId, errors.last().exception, checkpoint.lastModificationTime) } ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") } else { null @@ -478,7 +476,7 @@ class DBCheckpointStorage( compatible = checkpoint.compatible, progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH), ioRequestType = checkpoint.flowIoRequest, - checkpointInstant = now + checkpointInstant = checkpoint.lastModificationTime ) currentDBSession().update(dbFlowCheckpoint) @@ -486,7 +484,7 @@ class DBCheckpointStorage( dbFlowResult?.let { currentDBSession().save(it) } dbFlowException?.let { currentDBSession().save(it) } if (checkpoint.isFinished()) { - setDBFlowMetadataFinishTime(flowId, now) + setDBFlowMetadataFinishTime(flowId, checkpoint.lastModificationTime) } } @@ -559,7 +557,7 @@ class DBCheckpointStorage( override fun getPausedCheckpoints(): Stream> { val session = currentDBSession() val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status, - checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id) + checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id, checkpoint.checkpointInstant) from ${DBFlowCheckpoint::class.java.name} checkpoint join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id left outer join ${DBFlowException::class.java.name} exception on checkpoint.exceptionDetails = exception.id @@ -728,7 +726,8 @@ class DBCheckpointStorage( status = status, progressStep = progressStep, flowIoRequest = ioRequestType, - compatible = compatible + compatible = compatible, + lastModificationTime = blob!!.persistedInstant ) } @@ -739,7 +738,8 @@ class DBCheckpointStorage( val progressStep: String?, val ioRequestType: String?, val compatible: Boolean, - exception: String? + exception: String?, + val persistedInstant: Instant ) { val wasHospitalized = exception != null fun toSerializedCheckpoint(): Checkpoint.Serialized { @@ -752,7 +752,8 @@ class DBCheckpointStorage( status = status, progressStep = progressStep, flowIoRequest = ioRequestType, - compatible = compatible + compatible = compatible, + lastModificationTime = persistedInstant ) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 37b1afe6b2..18b275ffab 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -94,6 +94,7 @@ data class StateMachineState( * @param checkpointState the state of the checkpoint * @param flowState the state of the flow itself, including the frozen fiber/FlowLogic. * @param errorState the "dirtiness" state including the involved errors and their propagation status. + * @param lastModificationTime the time this checkpoint was last modified, which also corresponds conceptually to the last suspension point. */ data class Checkpoint( val checkpointState: CheckpointState, @@ -103,7 +104,8 @@ data class Checkpoint( val status: FlowStatus = FlowStatus.RUNNABLE, val progressStep: String? = null, val flowIoRequest: String? = null, - val compatible: Boolean = true + val compatible: Boolean = true, + val lastModificationTime: Instant ) { @CordaSerializable enum class FlowStatus { @@ -144,11 +146,11 @@ data class Checkpoint( listOf(topLevelSubFlow), numberOfSuspends = 0, // We set this to 1 here to avoid an extra copy and increment in UnstartedFlowTransition.createInitialCheckpoint - numberOfCommits = 1, - suspensionTime = timestamp + numberOfCommits = 1 ), flowState = FlowState.Unstarted(flowStart, frozenFlowLogic), - errorState = ErrorState.Clean + errorState = ErrorState.Clean, + lastModificationTime = timestamp ) } } @@ -217,7 +219,8 @@ data class Checkpoint( val status: FlowStatus, val progressStep: String?, val flowIoRequest: String?, - val compatible: Boolean + val compatible: Boolean, + val lastModificationTime: Instant ) { /** * Deserializes the serialized fields contained in [Checkpoint.Serialized]. @@ -238,7 +241,8 @@ data class Checkpoint( status = status, progressStep = progressStep, flowIoRequest = flowIoRequest, - compatible = compatible + compatible = compatible, + lastModificationTime = lastModificationTime ) } } @@ -252,7 +256,6 @@ data class Checkpoint( * @param subFlowStack The stack of currently executing subflows. * @param numberOfSuspends The number of flow suspends due to IO API calls. * @param numberOfCommits The number of times this checkpoint has been persisted. - * @param suspensionTime the time of the last suspension. This is supposed to be used as a stable timestamp in case of replays. */ @CordaSerializable data class CheckpointState( @@ -262,8 +265,7 @@ data class CheckpointState( val sessionsToBeClosed: Set, val subFlowStack: List, val numberOfSuspends: Int, - val numberOfCommits: Int, - val suspensionTime: Instant + val numberOfCommits: Int ) /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt index baf2293100..3ecabd8e0d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt @@ -138,7 +138,7 @@ class ErrorFlowTransition( if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) { val errorsWithId = errorMessages.mapIndexed { idx, errorMsg -> val messageId = MessageIdentifier(MessageType.SESSION_ERROR, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(), - sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.checkpointState.suspensionTime) + sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.lastModificationTime) messageId to errorMsg } @@ -148,7 +148,7 @@ class ErrorFlowTransition( else if (sessionState is SessionState.Initiated && !sessionState.otherSideErrored) { val errorsWithId = errorMessages.mapIndexed { idx, errorMsg -> val messageId = MessageIdentifier(MessageType.SESSION_ERROR, sessionState.shardId, sessionState.peerSinkSessionId, - sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.checkpointState.suspensionTime) + sessionState.nextSendingSeqNumber+idx, currentState.checkpoint.lastModificationTime) messageId to errorMsg }.toList() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 8034ac36c1..ba8cabfeb7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -184,7 +184,7 @@ class StartedFlowTransition( val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage) val messageType = MessageType.inferFromMessage(message) - val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generate(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.lastModificationTime) Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID)) } val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) -> @@ -303,7 +303,7 @@ class StartedFlowTransition( lastSenderSeqNo = null ) val messageType = MessageType.inferFromMessage(initialMessage) - val messageIdentifier = MessageIdentifier(messageType, shardId, counterpartySessionId, 0, checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, shardId, counterpartySessionId, 0, checkpoint.lastModificationTime) actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID))) newSessions[sourceSessionId] = newSessionState } @@ -360,14 +360,14 @@ class StartedFlowTransition( ) val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, message) val messageType = MessageType.inferFromMessage(initialMessage) - val messageIdentifier = MessageIdentifier(messageType, shardId, sourceSessionId.calculateInitiatedSessionId(), 0, checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, shardId, sourceSessionId.calculateInitiatedSessionId(), 0, checkpoint.lastModificationTime) Action.SendInitial(uninitiatedSessionState.destination, initialMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID)) } } ?: emptyList() messagesByType[SessionState.Initiating::class]?.forEach { (sourceSessionId, sessionState, message) -> val initiatingSessionState = sessionState as SessionState.Initiating val sessionMessage = DataSessionMessage(message) - val messageIdentifier = MessageIdentifier(MessageType.DATA_MESSAGE, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(), sessionState.nextSendingSeqNumber, checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(MessageType.DATA_MESSAGE, sessionState.shardId, sourceSessionId.calculateInitiatedSessionId(), sessionState.nextSendingSeqNumber, checkpoint.lastModificationTime) newSessions[sourceSessionId] = initiatingSessionState.bufferMessage(messageIdentifier, sessionMessage) } val sendExistingActions = messagesByType[SessionState.Initiated::class]?.map {(sourceSessionId, sessionState, message) -> @@ -376,7 +376,7 @@ class StartedFlowTransition( val sinkSessionId = initiatedSessionState.peerSinkSessionId val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage) val messageType = MessageType.inferFromMessage(existingMessage) - val messageIdentifier = MessageIdentifier(messageType, sessionState.shardId, sessionState.peerSinkSessionId, sessionState.nextSendingSeqNumber, checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, sessionState.shardId, sessionState.peerSinkSessionId, sessionState.nextSendingSeqNumber, checkpoint.lastModificationTime) newSessions[sourceSessionId] = initiatedSessionState.copy(nextSendingSeqNumber = initiatedSessionState.nextSendingSeqNumber + 1) Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID)) } ?: emptyList() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 4e1d84e4ea..9fe55b9ee7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -198,14 +198,14 @@ class TopLevelTransition( checkpointState.invocationContext }, numberOfSuspends = checkpointState.numberOfSuspends + 1, - numberOfCommits = checkpointState.numberOfCommits + 1, - suspensionTime = context.time + numberOfCommits = checkpointState.numberOfCommits + 1 ) copy( flowState = FlowState.Started(event.ioRequest, event.fiber), checkpointState = newCheckpointState, flowIoRequest = event.ioRequest::class.java.simpleName, - progressStep = event.progressStep?.label + progressStep = event.progressStep?.label, + lastModificationTime = context.time ) } if (event.maySkipCheckpoint) { @@ -244,12 +244,12 @@ class TopLevelTransition( checkpoint = checkpoint.copy( checkpointState = checkpoint.checkpointState.copy( numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1, - numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1, - suspensionTime = context.time + numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1 ), flowState = FlowState.Finished, result = event.returnValue, - status = Checkpoint.FlowStatus.COMPLETED + status = Checkpoint.FlowStatus.COMPLETED, + lastModificationTime = context.time ).removeSessions(checkpoint.checkpointState.sessions.keys), closedSessionsPendingToBeSignalled = emptyMap(), pendingDeduplicationHandlers = emptyList(), @@ -299,7 +299,7 @@ class TopLevelTransition( if (state is SessionState.Initiated) { val message = ExistingSessionMessage(state.peerSinkSessionId, EndSessionMessage) val messageType = MessageType.inferFromMessage(message) - val messageIdentifier = MessageIdentifier(messageType, state.shardId, state.peerSinkSessionId, state.nextSendingSeqNumber, startingState.checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, state.shardId, state.peerSinkSessionId, state.nextSendingSeqNumber, startingState.checkpoint.lastModificationTime) Action.SendExisting(state.peerParty, message, SenderDeduplicationInfo(messageIdentifier, startingState.senderUUID)) } else { null diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt index 127a2052f1..b4d608cc3a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt @@ -69,7 +69,7 @@ class UnstartedFlowTransition( val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo) val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage) val messageType = MessageType.inferFromMessage(sessionMessage) - val messageIdentifier = MessageIdentifier(messageType, flowStart.shardIdentifier, initiatingMessage.initiatorSessionId, 0, currentState.checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, flowStart.shardIdentifier, initiatingMessage.initiatorSessionId, 0, currentState.checkpoint.lastModificationTime) currentState = currentState.copy(checkpoint = currentState.checkpoint.setSessions(mapOf(flowStart.initiatedSessionId to initiatedState))) actions.add(Action.SendExisting(flowStart.peerSession.counterparty, sessionMessage, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID))) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 1b4f84a3dd..02c77ff2d0 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -130,7 +130,8 @@ class DBCheckpointStorageTests { logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) ), progressStep = "I have made progress", - flowIoRequest = FlowIORequest.SendAndReceive::class.java.simpleName + flowIoRequest = FlowIORequest.SendAndReceive::class.java.simpleName, + lastModificationTime = Clock.systemUTC().instant() ) val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() database.transaction {