NOTICK Do not cascade checkpoint tables (#6040)

Do not cascade updates to checkpoint error and result tables to hopefully
improve database performance moving forward. Because the joined tables
are no longer being updated by updating the main `DBFlowCheckpoint` entity,
they must be created/updated/deleted manually.

The checkpoint blobs still cascade as they pretty much always evolve in 
tandem with the main checkpoint table.
This commit is contained in:
Dan Newton 2020-03-09 10:01:30 +00:00 committed by GitHub
parent ab000e0533
commit d5e84a4f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 257 additions and 93 deletions

View File

@ -11,9 +11,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.vault.SessionScope
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
@ -71,7 +69,7 @@ class CordaPersistenceServiceTests {
hmac = ByteArray(16),
persistedInstant = now
),
result = DBCheckpointStorage.DBFlowResult(value = ByteArray(16), persistedInstant = now),
result = null,
exceptionDetails = null,
status = FlowStatus.RUNNABLE,
compatible = false,

View File

@ -37,6 +37,7 @@ import javax.persistence.OneToOne
/**
* Simple checkpoint key value storage in DB.
*/
@Suppress("TooManyFunctions")
class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage {
companion object {
@ -83,11 +84,11 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true)
@OneToOne(fetch = FetchType.LAZY, optional = true)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult?,
@OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true)
@OneToOne(fetch = FetchType.LAZY, optional = true)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException?,
@ -117,7 +118,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
@Column(name = "id", nullable = false)
private var id: Long = 0,
var id: Long = 0,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
@ -141,7 +142,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private var id: Long = 0,
var id: Long = 0,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
@ -157,7 +158,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private var id: Long = 0,
var id: Long = 0,
@Column(name = "type", nullable = false)
var type: Class<out Throwable>,
@ -319,14 +320,16 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
val flowId = id.uuid.toString()
val now = Instant.now()
// Load the previous entity from the hibernate cache so the meta data join does not get updated
val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId)
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
val result = checkpoint.result?.let { createDBFlowResult(it, now) }
val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) }
// Load the previous entity from the hibernate cache so the meta data join does not get updated
val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId)
val result = updateDBFlowResult(entity, checkpoint, now)
val exceptionDetails = updateDBFlowException(entity, checkpoint, now)
return entity.apply {
this.blob = blob
this.result = result
@ -354,6 +357,31 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
)
}
/**
* Creates, updates or deletes the result related to the current flow/checkpoint.
*
* This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually.
*
* A [DBFlowResult] is created if [DBFlowCheckpoint.result] does not exist and the [Checkpoint] has a result..
* The existing [DBFlowResult] is updated if [DBFlowCheckpoint.result] exists and the [Checkpoint] has a result.
* The existing [DBFlowResult] is deleted if [DBFlowCheckpoint.result] exists and the [Checkpoint] has no result.
* Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] do not have a result.
*/
private fun updateDBFlowResult(entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowResult? {
val result = checkpoint.result?.let { createDBFlowResult(it, now) }
if (entity.result != null) {
if (result != null) {
result.id = entity.result!!.id
currentDBSession().update(result)
} else {
currentDBSession().delete(entity.result)
}
} else if (result != null) {
currentDBSession().save(result)
}
return result
}
private fun createDBFlowResult(result: Any, now: Instant): DBFlowResult {
return DBFlowResult(
value = result.storageSerialize().bytes,
@ -361,6 +389,31 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
)
}
/**
* Creates, updates or deletes the error related to the current flow/checkpoint.
*
* This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually.
*
* A [DBFlowException] is created if [DBFlowCheckpoint.exceptionDetails] does not exist and the [Checkpoint] has an error attached to it.
* The existing [DBFlowException] is updated if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has an error.
* The existing [DBFlowException] is deleted if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has no error.
* Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] are related to no errors.
*/
private fun updateDBFlowException(entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowException? {
val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) }
if (entity.exceptionDetails != null) {
if (exceptionDetails != null) {
exceptionDetails.id = entity.exceptionDetails!!.id
currentDBSession().update(exceptionDetails)
} else {
currentDBSession().delete(entity.exceptionDetails)
}
} else if (exceptionDetails != null) {
currentDBSession().save(exceptionDetails)
}
return exceptionDetails
}
private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException {
return errorState.errors.last().exception.let {
DBFlowException(

View File

@ -3,6 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
@ -34,7 +35,6 @@ import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.lang.IllegalStateException
import java.time.Instant
import kotlin.streams.toList
import kotlin.test.assertEquals
@ -76,23 +76,23 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `add new checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState)
assertEquals(
checkpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
}
newCheckpointStorage()
database.transaction {
assertEquals(
checkpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).also {
assertNotNull(it)
@ -101,11 +101,44 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `update a checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val logic: FlowLogic<*> = object : FlowLogic<String>() {
override fun call(): String {
return "Updated flow logic"
}
}
val updatedCheckpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(numberOfSuspends = 20),
flowState = FlowState.Unstarted(
FlowStart.Explicit,
logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
),
progressStep = "I have made progress",
flowIoRequest = FlowIORequest.SendAndReceive::class.java
)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState)
}
database.transaction {
assertEquals(
updatedCheckpoint,
checkpointStorage.checkpoints().single().deserialize()
)
}
}
@Test(timeout = 300_000)
fun `remove checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
@ -125,10 +158,9 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `add and remove checkpoint in single commit operation`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState = checkpoint.serializeFlowState()
val (id2, checkpoint2) = newCheckpoint()
val serializedFlowState2 =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState2 = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
createMetadataRecord(checkpoint2)
@ -139,14 +171,14 @@ class DBCheckpointStorageTests {
database.transaction {
assertEquals(
checkpoint2,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
}
newCheckpointStorage()
database.transaction {
assertEquals(
checkpoint2,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
}
}
@ -154,16 +186,14 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `add two checkpoints then remove first one`() {
val (id, firstCheckpoint) = newCheckpoint()
val serializedFirstFlowState =
firstCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFirstFlowState = firstCheckpoint.serializeFlowState()
database.transaction {
createMetadataRecord(firstCheckpoint)
checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState)
}
val (id2, secondCheckpoint) = newCheckpoint()
val serializedSecondFlowState =
secondCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedSecondFlowState = secondCheckpoint.serializeFlowState()
database.transaction {
createMetadataRecord(secondCheckpoint)
checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState)
@ -174,14 +204,14 @@ class DBCheckpointStorageTests {
database.transaction {
assertEquals(
secondCheckpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
}
newCheckpointStorage()
database.transaction {
assertEquals(
secondCheckpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.checkpoints().single().deserialize()
)
}
}
@ -189,8 +219,7 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `add checkpoint and then remove after 'restart'`() {
val (id, originalCheckpoint) = newCheckpoint()
val serializedOriginalFlowState =
originalCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedOriginalFlowState = originalCheckpoint.serializeFlowState()
database.transaction {
createMetadataRecord(originalCheckpoint)
checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState)
@ -200,7 +229,7 @@ class DBCheckpointStorageTests {
checkpointStorage.checkpoints().single()
}
database.transaction {
assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT))
assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize())
assertThat(reconstructedCheckpoint.serializedFlowState).isEqualTo(serializedOriginalFlowState)
.isNotSameAs(serializedOriginalFlowState)
}
@ -217,8 +246,7 @@ class DBCheckpointStorageTests {
val mockServices = MockServices(emptyList(), ALICE.name)
database.transaction {
val (id, checkpoint) = newCheckpoint(1)
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState = checkpoint.serializeFlowState()
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
@ -229,8 +257,7 @@ class DBCheckpointStorageTests {
database.transaction {
val (id1, checkpoint1) = newCheckpoint(2)
val serializedFlowState1 =
checkpoint1.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFlowState1 = checkpoint1.serializeFlowState()
createMetadataRecord(checkpoint1)
checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1)
}
@ -243,107 +270,172 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
fun `checkpoint can be recreated from database record`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState)
}
database.transaction {
assertEquals(checkpoint, checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT))
}
}
@Test(timeout = 300_000)
fun `update checkpoint with result information`() {
fun `update checkpoint with result information creates new result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState =
updatedCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState)
}
database.transaction {
assertEquals(
result,
checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).result
checkpointStorage.getCheckpoint(id)!!.deserialize().result
)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
}
}
@Test(timeout = 300_000)
fun `update checkpoint with error information`() {
val exception = IllegalStateException("I am a naughty exception")
fun `update checkpoint with result information updates existing result database record`() {
val result = "This is the result"
val somehowThereIsANewResult = "Another result (which should not be possible!)"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(
errorState = ErrorState.Errored(
listOf(
FlowError(
0,
exception
)
), 0, false
)
)
val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState)
}
val updatedCheckpoint2 = checkpoint.copy(result = somehowThereIsANewResult)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2)
}
database.transaction {
assertEquals(
somehowThereIsANewResult,
checkpointStorage.getCheckpoint(id)!!.deserialize().result
)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
}
}
@Test(timeout = 300_000)
fun `clean checkpoints clear out error information from the database`() {
val exception = IllegalStateException("I am a naughty exception")
fun `removing result information from checkpoint deletes existing result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(
errorState = ErrorState.Errored(
listOf(
FlowError(
0,
exception
)
), 0, false
)
)
val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState)
}
val updatedCheckpoint2 = checkpoint.copy(result = null)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2)
}
database.transaction {
assertNull(checkpointStorage.getCheckpoint(id)!!.deserialize().result)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(0, session.createQuery(criteria).resultList.size)
}
}
@Test(timeout = 300_000)
fun `update checkpoint with error information creates a new error database record`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.addError(exception)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails
assertNotNull(exceptionDetails)
assertEquals(exception::class.java, exceptionDetails!!.type)
assertEquals(exception.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
}
}
@Test(timeout = 300_000)
fun `update checkpoint with new error information updates the existing error database record`() {
val illegalStateException = IllegalStateException("I am a naughty exception")
val illegalArgumentException = IllegalArgumentException("I am a very naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint1 = checkpoint.addError(illegalStateException)
val updatedSerializedFlowState1 = updatedCheckpoint1.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint1, updatedSerializedFlowState1) }
// Set back to clean
val updatedCheckpoint2 = checkpoint.addError(illegalArgumentException)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2) }
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails
assertNotNull(exceptionDetails)
assertEquals(illegalArgumentException::class.java, exceptionDetails!!.type)
assertEquals(illegalArgumentException.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
}
}
@Test(timeout = 300_000)
fun `clean checkpoints delete the error record from the database`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.addError(exception)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
}
// Set back to clean
database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState) }
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(0, session.createQuery(criteria).resultList.size)
}
}
@ -379,6 +471,27 @@ class DBCheckpointStorageTests {
return id to checkpoint
}
private fun Checkpoint.serializeFlowState(): SerializedBytes<FlowState> {
return flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
}
private fun Checkpoint.Serialized.deserialize(): Checkpoint {
return deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
}
private fun Checkpoint.addError(exception: Exception): Checkpoint {
return copy(
errorState = ErrorState.Errored(
listOf(
FlowError(
0,
exception
)
), 0, false
)
)
}
private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) {
val metadata = DBCheckpointStorage.DBFlowMetadata(
invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value,