diff --git a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt index ff36d5e7a2..a983b55798 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt @@ -11,9 +11,7 @@ import net.corda.core.node.services.CordaService import net.corda.core.node.services.vault.SessionScope import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow -import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint.FlowStatus -import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver @@ -71,7 +69,7 @@ class CordaPersistenceServiceTests { hmac = ByteArray(16), persistedInstant = now ), - result = DBCheckpointStorage.DBFlowResult(value = ByteArray(16), persistedInstant = now), + result = null, exceptionDetails = null, status = FlowStatus.RUNNABLE, compatible = false, diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 45a6739b2a..adde4e489f 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -37,6 +37,7 @@ import javax.persistence.OneToOne /** * Simple checkpoint key value storage in DB. */ +@Suppress("TooManyFunctions") class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage { companion object { @@ -83,11 +84,11 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") var blob: DBFlowCheckpointBlob, - @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @OneToOne(fetch = FetchType.LAZY, optional = true) @JoinColumn(name = "result_id", referencedColumnName = "id") var result: DBFlowResult?, - @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @OneToOne(fetch = FetchType.LAZY, optional = true) @JoinColumn(name = "error_id", referencedColumnName = "id") var exceptionDetails: DBFlowException?, @@ -117,7 +118,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP @Id @GeneratedValue(strategy = GenerationType.SEQUENCE) @Column(name = "id", nullable = false) - private var id: Long = 0, + var id: Long = 0, @Type(type = "corda-blob") @Column(name = "checkpoint_value", nullable = false) @@ -141,7 +142,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP @Id @Column(name = "id", nullable = false) @GeneratedValue(strategy = GenerationType.SEQUENCE) - private var id: Long = 0, + var id: Long = 0, @Type(type = "corda-blob") @Column(name = "result_value", nullable = false) @@ -157,7 +158,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP @Id @Column(name = "id", nullable = false) @GeneratedValue(strategy = GenerationType.SEQUENCE) - private var id: Long = 0, + var id: Long = 0, @Column(name = "type", nullable = false) var type: Class, @@ -319,14 +320,16 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP val flowId = id.uuid.toString() val now = Instant.now() + // Load the previous entity from the hibernate cache so the meta data join does not get updated + val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId) + val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) - val result = checkpoint.result?.let { createDBFlowResult(it, now) } - val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) } - // Load the previous entity from the hibernate cache so the meta data join does not get updated - val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId) + val result = updateDBFlowResult(entity, checkpoint, now) + val exceptionDetails = updateDBFlowException(entity, checkpoint, now) + return entity.apply { this.blob = blob this.result = result @@ -354,6 +357,31 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP ) } + /** + * Creates, updates or deletes the result related to the current flow/checkpoint. + * + * This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually. + * + * A [DBFlowResult] is created if [DBFlowCheckpoint.result] does not exist and the [Checkpoint] has a result.. + * The existing [DBFlowResult] is updated if [DBFlowCheckpoint.result] exists and the [Checkpoint] has a result. + * The existing [DBFlowResult] is deleted if [DBFlowCheckpoint.result] exists and the [Checkpoint] has no result. + * Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] do not have a result. + */ + private fun updateDBFlowResult(entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowResult? { + val result = checkpoint.result?.let { createDBFlowResult(it, now) } + if (entity.result != null) { + if (result != null) { + result.id = entity.result!!.id + currentDBSession().update(result) + } else { + currentDBSession().delete(entity.result) + } + } else if (result != null) { + currentDBSession().save(result) + } + return result + } + private fun createDBFlowResult(result: Any, now: Instant): DBFlowResult { return DBFlowResult( value = result.storageSerialize().bytes, @@ -361,6 +389,31 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP ) } + /** + * Creates, updates or deletes the error related to the current flow/checkpoint. + * + * This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually. + * + * A [DBFlowException] is created if [DBFlowCheckpoint.exceptionDetails] does not exist and the [Checkpoint] has an error attached to it. + * The existing [DBFlowException] is updated if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has an error. + * The existing [DBFlowException] is deleted if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has no error. + * Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] are related to no errors. + */ + private fun updateDBFlowException(entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowException? { + val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) } + if (entity.exceptionDetails != null) { + if (exceptionDetails != null) { + exceptionDetails.id = entity.exceptionDetails!!.id + currentDBSession().update(exceptionDetails) + } else { + currentDBSession().delete(entity.exceptionDetails) + } + } else if (exceptionDetails != null) { + currentDBSession().save(exceptionDetails) + } + return exceptionDetails + } + private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException { return errorState.errors.last().exception.let { DBFlowException( diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index c30044cb02..08be3bbf3e 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -3,6 +3,7 @@ package net.corda.node.services.persistence import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.FlowIORequest import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationDefaults @@ -34,7 +35,6 @@ import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test -import java.lang.IllegalStateException import java.time.Instant import kotlin.streams.toList import kotlin.test.assertEquals @@ -76,23 +76,23 @@ class DBCheckpointStorageTests { @Test(timeout = 300_000) fun `add new checkpoint`() { val (id, checkpoint) = newCheckpoint() - val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState = checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { + assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState) assertEquals( checkpoint, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) } newCheckpointStorage() database.transaction { assertEquals( checkpoint, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).also { assertNotNull(it) @@ -101,11 +101,44 @@ class DBCheckpointStorageTests { } } + @Test(timeout = 300_000) + fun `update a checkpoint`() { + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val logic: FlowLogic<*> = object : FlowLogic() { + override fun call(): String { + return "Updated flow logic" + } + } + val updatedCheckpoint = checkpoint.copy( + checkpointState = checkpoint.checkpointState.copy(numberOfSuspends = 20), + flowState = FlowState.Unstarted( + FlowStart.Explicit, + logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ), + progressStep = "I have made progress", + flowIoRequest = FlowIORequest.SendAndReceive::class.java + ) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) + } + database.transaction { + assertEquals( + updatedCheckpoint, + checkpointStorage.checkpoints().single().deserialize() + ) + } + } + @Test(timeout = 300_000) fun `remove checkpoint`() { val (id, checkpoint) = newCheckpoint() - val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState = checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) @@ -125,10 +158,9 @@ class DBCheckpointStorageTests { @Test(timeout = 300_000) fun `add and remove checkpoint in single commit operation`() { val (id, checkpoint) = newCheckpoint() - val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState = checkpoint.serializeFlowState() val (id2, checkpoint2) = newCheckpoint() - val serializedFlowState2 = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState2 = checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) createMetadataRecord(checkpoint2) @@ -139,14 +171,14 @@ class DBCheckpointStorageTests { database.transaction { assertEquals( checkpoint2, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) } newCheckpointStorage() database.transaction { assertEquals( checkpoint2, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) } } @@ -154,16 +186,14 @@ class DBCheckpointStorageTests { @Test(timeout = 300_000) fun `add two checkpoints then remove first one`() { val (id, firstCheckpoint) = newCheckpoint() - val serializedFirstFlowState = - firstCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFirstFlowState = firstCheckpoint.serializeFlowState() database.transaction { createMetadataRecord(firstCheckpoint) checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState) } val (id2, secondCheckpoint) = newCheckpoint() - val serializedSecondFlowState = - secondCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedSecondFlowState = secondCheckpoint.serializeFlowState() database.transaction { createMetadataRecord(secondCheckpoint) checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState) @@ -174,14 +204,14 @@ class DBCheckpointStorageTests { database.transaction { assertEquals( secondCheckpoint, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) } newCheckpointStorage() database.transaction { assertEquals( secondCheckpoint, - checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.checkpoints().single().deserialize() ) } } @@ -189,8 +219,7 @@ class DBCheckpointStorageTests { @Test(timeout = 300_000) fun `add checkpoint and then remove after 'restart'`() { val (id, originalCheckpoint) = newCheckpoint() - val serializedOriginalFlowState = - originalCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedOriginalFlowState = originalCheckpoint.serializeFlowState() database.transaction { createMetadataRecord(originalCheckpoint) checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState) @@ -200,7 +229,7 @@ class DBCheckpointStorageTests { checkpointStorage.checkpoints().single() } database.transaction { - assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) + assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize()) assertThat(reconstructedCheckpoint.serializedFlowState).isEqualTo(serializedOriginalFlowState) .isNotSameAs(serializedOriginalFlowState) } @@ -217,8 +246,7 @@ class DBCheckpointStorageTests { val mockServices = MockServices(emptyList(), ALICE.name) database.transaction { val (id, checkpoint) = newCheckpoint(1) - val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState = checkpoint.serializeFlowState() createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } @@ -229,8 +257,7 @@ class DBCheckpointStorageTests { database.transaction { val (id1, checkpoint1) = newCheckpoint(2) - val serializedFlowState1 = - checkpoint1.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val serializedFlowState1 = checkpoint1.serializeFlowState() createMetadataRecord(checkpoint1) checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1) } @@ -243,107 +270,172 @@ class DBCheckpointStorageTests { } @Test(timeout = 300_000) - fun `checkpoint can be recreated from database record`() { - val (id, checkpoint) = newCheckpoint() - val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) - database.transaction { - createMetadataRecord(checkpoint) - checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) - } - database.transaction { - assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState) - } - database.transaction { - assertEquals(checkpoint, checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) - } - } - - @Test(timeout = 300_000) - fun `update checkpoint with result information`() { + fun `update checkpoint with result information creates new result database record`() { val result = "This is the result" val (id, checkpoint) = newCheckpoint() val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } val updatedCheckpoint = checkpoint.copy(result = result) - val updatedSerializedFlowState = - updatedCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } database.transaction { assertEquals( result, - checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).result + checkpointStorage.getCheckpoint(id)!!.deserialize().result ) assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) + assertEquals(1, session.createQuery(criteria).resultList.size) } } @Test(timeout = 300_000) - fun `update checkpoint with error information`() { - val exception = IllegalStateException("I am a naughty exception") + fun `update checkpoint with result information updates existing result database record`() { + val result = "This is the result" + val somehowThereIsANewResult = "Another result (which should not be possible!)" val (id, checkpoint) = newCheckpoint() val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } - val updatedCheckpoint = checkpoint.copy( - errorState = ErrorState.Errored( - listOf( - FlowError( - 0, - exception - ) - ), 0, false - ) - ) - val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) - database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + val updatedCheckpoint = checkpoint.copy(result = result) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() database.transaction { - // Checkpoint always returns clean error state when retrieved via [getCheckpoint] - assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) - assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) + } + val updatedCheckpoint2 = checkpoint.copy(result = somehowThereIsANewResult) + val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState() + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2) + } + database.transaction { + assertEquals( + somehowThereIsANewResult, + checkpointStorage.getCheckpoint(id)!!.deserialize().result + ) + assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) + assertEquals(1, session.createQuery(criteria).resultList.size) } } @Test(timeout = 300_000) - fun `clean checkpoints clear out error information from the database`() { - val exception = IllegalStateException("I am a naughty exception") + fun `removing result information from checkpoint deletes existing result database record`() { + val result = "This is the result" val (id, checkpoint) = newCheckpoint() val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpoint.serializeFlowState() database.transaction { createMetadataRecord(checkpoint) checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } - val updatedCheckpoint = checkpoint.copy( - errorState = ErrorState.Errored( - listOf( - FlowError( - 0, - exception - ) - ), 0, false - ) - ) - val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val updatedCheckpoint = checkpoint.copy(result = result) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) + } + val updatedCheckpoint2 = checkpoint.copy(result = null) + val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState() + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2) + } + database.transaction { + assertNull(checkpointStorage.getCheckpoint(id)!!.deserialize().result) + assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) + assertEquals(0, session.createQuery(criteria).resultList.size) + } + } + + @Test(timeout = 300_000) + fun `update checkpoint with error information creates a new error database record`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.addError(exception) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } database.transaction { // Checkpoint always returns clean error state when retrieved via [getCheckpoint] - assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean) + val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails + assertNotNull(exceptionDetails) + assertEquals(exception::class.java, exceptionDetails!!.type) + assertEquals(exception.message, exceptionDetails.message) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) + assertEquals(1, session.createQuery(criteria).resultList.size) + } + } + + @Test(timeout = 300_000) + fun `update checkpoint with new error information updates the existing error database record`() { + val illegalStateException = IllegalStateException("I am a naughty exception") + val illegalArgumentException = IllegalArgumentException("I am a very naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint1 = checkpoint.addError(illegalStateException) + val updatedSerializedFlowState1 = updatedCheckpoint1.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint1, updatedSerializedFlowState1) } + // Set back to clean + val updatedCheckpoint2 = checkpoint.addError(illegalArgumentException) + val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2) } + database.transaction { + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean) + val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails + assertNotNull(exceptionDetails) + assertEquals(illegalArgumentException::class.java, exceptionDetails!!.type) + assertEquals(illegalArgumentException.message, exceptionDetails.message) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) + assertEquals(1, session.createQuery(criteria).resultList.size) + } + } + + @Test(timeout = 300_000) + fun `clean checkpoints delete the error record from the database`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.addError(exception) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + database.transaction { + // Checkpoint always returns clean error state when retrieved via [getCheckpoint] + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean) } // Set back to clean database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { - assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean) assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) + criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) + assertEquals(0, session.createQuery(criteria).resultList.size) } } @@ -379,6 +471,27 @@ class DBCheckpointStorageTests { return id to checkpoint } + private fun Checkpoint.serializeFlowState(): SerializedBytes { + return flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + } + + private fun Checkpoint.Serialized.deserialize(): Checkpoint { + return deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + } + + private fun Checkpoint.addError(exception: Exception): Checkpoint { + return copy( + errorState = ErrorState.Errored( + listOf( + FlowError( + 0, + exception + ) + ), 0, false + ) + ) + } + private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) { val metadata = DBCheckpointStorage.DBFlowMetadata( invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value,