CORDA-3691 delete checkpoint record when complete (#6134)

* CORDA-3691 Delete checkpoint when flow finishes

The checkpoint and its related records in joined tables should be deleted
when a flow finishes.

Keeping these flows around will be completed in the future.

* CORDA-3691 Ignore some flow metadata tests

Ignore tests around recording the finish time of flow metadata records
since we are not currently keeping COMPLETED flows in the database.
This commit is contained in:
Dan Newton 2020-04-09 16:57:03 +01:00 committed by GitHub
parent 501b766e71
commit 896b0ab246
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 169 additions and 40 deletions

View File

@ -282,12 +282,18 @@ class DBCheckpointStorage(
}
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val delete = criteriaBuilder.createCriteriaDelete(DBFlowCheckpoint::class.java)
val root = delete.from(DBFlowCheckpoint::class.java)
delete.where(criteriaBuilder.equal(root.get<String>(DBFlowCheckpoint::id.name), id.uuid.toString()))
return session.createQuery(delete).executeUpdate() > 0
// This will be changed after performance tuning
return currentDBSession().let { session ->
session.find(DBFlowCheckpoint::class.java, id.uuid.toString())?.run {
result?.let { session.delete(result) }
exceptionDetails?.let { session.delete(exceptionDetails) }
session.delete(blob)
session.delete(this)
// The metadata foreign key might be the wrong way around
session.delete(flowMetadata)
true
}
} ?: false
}
override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? {
@ -310,7 +316,7 @@ class DBCheckpointStorage(
val criteriaQuery = criteriaBuilder.createQuery(DBFlowCheckpoint::class.java)
val root = criteriaQuery.from(DBFlowCheckpoint::class.java)
criteriaQuery.select(root)
.where(criteriaBuilder.not(root.get<FlowStatus>(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS)))
.where(criteriaBuilder.not(root.get<FlowStatus>(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS)))
return session.createQuery(criteriaQuery).stream().map {
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
}
@ -513,7 +519,7 @@ class DBCheckpointStorage(
private fun InvocationContext.getFlowParameters(): List<Any?> {
// Only RPC flows have parameters which are found in index 1
return if(arguments.isNotEmpty()) {
return if (arguments.isNotEmpty()) {
uncheckedCast<Any?, Array<Any?>>(arguments[1]).toList()
} else {
emptyList()
@ -540,7 +546,7 @@ class DBCheckpointStorage(
return serialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
private fun Checkpoint.isFinished() = when(status) {
private fun Checkpoint.isFinished() = when (status) {
FlowStatus.COMPLETED, FlowStatus.KILLED, FlowStatus.FAILED -> true
else -> false
}

View File

@ -228,9 +228,11 @@ class TopLevelTransition(
isRemoved = true
)
val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
if (currentState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
}
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, currentState.checkpoint, currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
Action.AcknowledgeMessages(pendingDeduplicationHandlers),

View File

@ -22,6 +22,7 @@ import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
@ -153,19 +154,34 @@ class DBCheckpointStorageTests {
}
database.transaction {
assertEquals(
completedCheckpoint,
checkpointStorage.checkpoints().single().deserialize()
completedCheckpoint,
checkpointStorage.checkpoints().single().deserialize()
)
}
}
@Test(timeout = 300_000)
fun `remove checkpoint`() {
fun `removing a checkpoint deletes from all checkpoint tables`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.addError(exception).copy(result = "The result")
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(2, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
@ -176,6 +192,100 @@ class DBCheckpointStorageTests {
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
@Test(timeout = 300_000)
fun `removing a checkpoint when there is no result does not fail`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.addError(exception)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(2, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
@Test(timeout = 300_000)
fun `removing a checkpoint when there is no exception does not fail`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(result = "The result")
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(2, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
@Test(timeout = 300_000)
@ -346,9 +456,7 @@ class DBCheckpointStorageTests {
checkpointStorage.getCheckpoint(id)!!.deserialize().result
)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
}
}
@ -379,9 +487,7 @@ class DBCheckpointStorageTests {
checkpointStorage.getCheckpoint(id)!!.deserialize().result
)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
}
}
@ -407,9 +513,7 @@ class DBCheckpointStorageTests {
database.transaction {
assertNull(checkpointStorage.getCheckpoint(id)!!.deserialize().result)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java))
assertEquals(0, session.createQuery(criteria).resultList.size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
}
}
@ -431,9 +535,7 @@ class DBCheckpointStorageTests {
assertNotNull(exceptionDetails)
assertEquals(exception::class.java.name, exceptionDetails!!.type)
assertEquals(exception.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
}
@ -459,9 +561,7 @@ class DBCheckpointStorageTests {
assertNotNull(exceptionDetails)
assertEquals(illegalArgumentException::class.java.name, exceptionDetails!!.type)
assertEquals(illegalArgumentException.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(1, session.createQuery(criteria).resultList.size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
}
@ -485,9 +585,7 @@ class DBCheckpointStorageTests {
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
assertEquals(0, session.createQuery(criteria).resultList.size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
}
@ -532,7 +630,7 @@ class DBCheckpointStorageTests {
database.transaction {
val newCheckpoint = checkpoint.copy(progressStep = longString)
val serializedFlowState = newCheckpoint.flowState.checkpointSerialize(
context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
)
checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState)
}
@ -557,7 +655,7 @@ class DBCheckpointStorageTests {
database.transaction {
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), runnable, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), hospitalized, serializedFlowState)
@ -706,4 +804,10 @@ class DBCheckpointStorageTests {
)
)
}
private inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
val criteria = session.criteriaBuilder.createQuery(T::class.java)
criteria.select(criteria.from(T::class.java))
return session.createQuery(criteria).resultList
}
}

View File

@ -31,6 +31,7 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -43,6 +44,10 @@ import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
@ -70,6 +75,7 @@ import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.Notification
import rx.Observable
@ -77,13 +83,12 @@ import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.ArrayList
import java.util.concurrent.TimeoutException
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class FlowFrameworkTests {
@ -348,7 +353,7 @@ class FlowFrameworkTests {
//We should update this test when we do the work to persists the flow result.
@Test(timeout = 300_000)
fun `Flow status is set to completed in database when the flow finishes and serialised flow state is null`() {
fun `Checkpoint and all its related records are deleted when the flow finishes`() {
val terminationSignal = Semaphore(0)
val flow = aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal))
mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point
@ -362,12 +367,15 @@ class FlowFrameworkTests {
mockNet.waitQuiescent()
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id)
assertNull(checkpoint!!.result)
assertNull(checkpoint.serializedFlowState)
assertEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status)
assertNull(checkpoint)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
// Ignoring test since completed flows are not currently keeping their checkpoints in the database
@Ignore
@Test(timeout = 300_000)
fun `Flow metadata finish time is set in database when the flow finishes`() {
val terminationSignal = Semaphore(0)
@ -821,6 +829,12 @@ class FlowFrameworkTests {
assertEquals(null, persistedException)
}
private inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
val criteria = session.criteriaBuilder.createQuery(T::class.java)
criteria.select(criteria.from(T::class.java))
return session.createQuery(criteria).resultList
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)

View File

@ -43,6 +43,7 @@ import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.time.Instant
import java.util.concurrent.CompletableFuture
@ -354,6 +355,8 @@ class FlowMetadataRecordingTest {
}
}
// Ignoring test since completed flows are not currently keeping their checkpoints in the database
@Ignore
@Test(timeout = 300_000)
fun `flows have their finish time recorded when completed`() {
driver(DriverParameters(startNodesInProcess = true)) {