diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index c67b8295e0..abcdde622c 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -282,12 +282,18 @@ class DBCheckpointStorage( } override fun removeCheckpoint(id: StateMachineRunId): Boolean { - val session = currentDBSession() - val criteriaBuilder = session.criteriaBuilder - val delete = criteriaBuilder.createCriteriaDelete(DBFlowCheckpoint::class.java) - val root = delete.from(DBFlowCheckpoint::class.java) - delete.where(criteriaBuilder.equal(root.get(DBFlowCheckpoint::id.name), id.uuid.toString())) - return session.createQuery(delete).executeUpdate() > 0 + // This will be changed after performance tuning + return currentDBSession().let { session -> + session.find(DBFlowCheckpoint::class.java, id.uuid.toString())?.run { + result?.let { session.delete(result) } + exceptionDetails?.let { session.delete(exceptionDetails) } + session.delete(blob) + session.delete(this) + // The metadata foreign key might be the wrong way around + session.delete(flowMetadata) + true + } + } ?: false } override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? { @@ -310,7 +316,7 @@ class DBCheckpointStorage( val criteriaQuery = criteriaBuilder.createQuery(DBFlowCheckpoint::class.java) val root = criteriaQuery.from(DBFlowCheckpoint::class.java) criteriaQuery.select(root) - .where(criteriaBuilder.not(root.get(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS))) + .where(criteriaBuilder.not(root.get(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS))) return session.createQuery(criteriaQuery).stream().map { StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint() } @@ -513,7 +519,7 @@ class DBCheckpointStorage( private fun InvocationContext.getFlowParameters(): List { // Only RPC flows have parameters which are found in index 1 - return if(arguments.isNotEmpty()) { + return if (arguments.isNotEmpty()) { uncheckedCast>(arguments[1]).toList() } else { emptyList() @@ -540,7 +546,7 @@ class DBCheckpointStorage( return serialize(context = SerializationDefaults.STORAGE_CONTEXT) } - private fun Checkpoint.isFinished() = when(status) { + private fun Checkpoint.isFinished() = when (status) { FlowStatus.COMPLETED, FlowStatus.KILLED, FlowStatus.FAILED -> true else -> false } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 79598910c0..37ac11489a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -228,9 +228,11 @@ class TopLevelTransition( isRemoved = true ) val allSourceSessionIds = checkpoint.checkpointState.sessions.keys + if (currentState.isAnyCheckpointPersisted) { + actions.add(Action.RemoveCheckpoint(context.id)) + } actions.addAll(arrayOf( - Action.PersistCheckpoint(context.id, currentState.checkpoint, currentState.isAnyCheckpointPersisted), - Action.PersistDeduplicationFacts(pendingDeduplicationHandlers), + Action.PersistDeduplicationFacts(pendingDeduplicationHandlers), Action.ReleaseSoftLocks(event.softLocksId), Action.CommitTransaction, Action.AcknowledgeMessages(pendingDeduplicationHandlers), diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 0da98cf202..8ec79aa439 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -22,6 +22,7 @@ import net.corda.node.services.statemachine.SubFlowVersion import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity @@ -153,19 +154,34 @@ class DBCheckpointStorageTests { } database.transaction { assertEquals( - completedCheckpoint, - checkpointStorage.checkpoints().single().deserialize() + completedCheckpoint, + checkpointStorage.checkpoints().single().deserialize() ) } } @Test(timeout = 300_000) - fun `remove checkpoint`() { + fun `removing a checkpoint deletes from all checkpoint tables`() { + val exception = IllegalStateException("I am a naughty exception") val (id, checkpoint) = newCheckpoint() val serializedFlowState = checkpoint.serializeFlowState() database.transaction { checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } + val updatedCheckpoint = checkpoint.addError(exception).copy(result = "The result") + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + + database.transaction { + assertEquals(1, findRecordsFromDatabase().size) + // The result not stored yet + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(2, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + } + database.transaction { checkpointStorage.removeCheckpoint(id) } @@ -176,6 +192,100 @@ class DBCheckpointStorageTests { database.transaction { assertThat(checkpointStorage.checkpoints()).isEmpty() } + + database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + } + } + + @Test(timeout = 300_000) + fun `removing a checkpoint when there is no result does not fail`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.addError(exception) + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + + database.transaction { + assertEquals(1, findRecordsFromDatabase().size) + // The result not stored yet + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(2, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + } + + database.transaction { + checkpointStorage.removeCheckpoint(id) + } + database.transaction { + assertThat(checkpointStorage.checkpoints()).isEmpty() + } + newCheckpointStorage() + database.transaction { + assertThat(checkpointStorage.checkpoints()).isEmpty() + } + + database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + } + } + + @Test(timeout = 300_000) + fun `removing a checkpoint when there is no exception does not fail`() { + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy(result = "The result") + val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState() + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + + database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + // The result not stored yet + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(2, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + } + + database.transaction { + checkpointStorage.removeCheckpoint(id) + } + database.transaction { + assertThat(checkpointStorage.checkpoints()).isEmpty() + } + newCheckpointStorage() + database.transaction { + assertThat(checkpointStorage.checkpoints()).isEmpty() + } + + database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + // The saving of checkpoint blobs needs to be fixed + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + } } @Test(timeout = 300_000) @@ -346,9 +456,7 @@ class DBCheckpointStorageTests { checkpointStorage.getCheckpoint(id)!!.deserialize().result ) assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) - assertEquals(1, session.createQuery(criteria).resultList.size) + assertEquals(1, findRecordsFromDatabase().size) } } @@ -379,9 +487,7 @@ class DBCheckpointStorageTests { checkpointStorage.getCheckpoint(id)!!.deserialize().result ) assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) - assertEquals(1, session.createQuery(criteria).resultList.size) + assertEquals(1, findRecordsFromDatabase().size) } } @@ -407,9 +513,7 @@ class DBCheckpointStorageTests { database.transaction { assertNull(checkpointStorage.getCheckpoint(id)!!.deserialize().result) assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowResult::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowResult::class.java)) - assertEquals(0, session.createQuery(criteria).resultList.size) + assertEquals(0, findRecordsFromDatabase().size) } } @@ -431,9 +535,7 @@ class DBCheckpointStorageTests { assertNotNull(exceptionDetails) assertEquals(exception::class.java.name, exceptionDetails!!.type) assertEquals(exception.message, exceptionDetails.message) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) - assertEquals(1, session.createQuery(criteria).resultList.size) + assertEquals(1, findRecordsFromDatabase().size) } } @@ -459,9 +561,7 @@ class DBCheckpointStorageTests { assertNotNull(exceptionDetails) assertEquals(illegalArgumentException::class.java.name, exceptionDetails!!.type) assertEquals(illegalArgumentException.message, exceptionDetails.message) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) - assertEquals(1, session.createQuery(criteria).resultList.size) + assertEquals(1, findRecordsFromDatabase().size) } } @@ -485,9 +585,7 @@ class DBCheckpointStorageTests { database.transaction { assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean) assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) - val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java) - criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java)) - assertEquals(0, session.createQuery(criteria).resultList.size) + assertEquals(0, findRecordsFromDatabase().size) } } @@ -532,7 +630,7 @@ class DBCheckpointStorageTests { database.transaction { val newCheckpoint = checkpoint.copy(progressStep = longString) val serializedFlowState = newCheckpoint.flowState.checkpointSerialize( - context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT + context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT ) checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState) } @@ -557,7 +655,7 @@ class DBCheckpointStorageTests { database.transaction { val serializedFlowState = - checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), runnable, serializedFlowState) checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), hospitalized, serializedFlowState) @@ -706,4 +804,10 @@ class DBCheckpointStorageTests { ) ) } + + private inline fun DatabaseTransaction.findRecordsFromDatabase(): List { + val criteria = session.criteriaBuilder.createQuery(T::class.java) + criteria.select(criteria.from(T::class.java)) + return session.createQuery(criteria).resultList + } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 9eefaec2e2..05955f343b 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -31,6 +31,7 @@ import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.queryBy import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction @@ -43,6 +44,10 @@ import net.corda.core.utilities.unwrap import net.corda.node.services.persistence.CheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.checkpoints +import net.corda.nodeapi.internal.persistence.DatabaseTransaction +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.contextTransaction +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyState import net.corda.testing.core.ALICE_NAME @@ -70,6 +75,7 @@ import org.junit.Assert.assertNotEquals import org.junit.Assert.assertNotNull import org.junit.Assert.assertNull import org.junit.Before +import org.junit.Ignore import org.junit.Test import rx.Notification import rx.Observable @@ -77,13 +83,12 @@ import java.sql.SQLTransientConnectionException import java.time.Clock import java.time.Duration import java.time.Instant -import java.util.* +import java.util.ArrayList import java.util.concurrent.TimeoutException import java.util.function.Predicate import kotlin.reflect.KClass import kotlin.streams.toList import kotlin.test.assertFailsWith -import kotlin.test.assertNotNull import kotlin.test.assertTrue class FlowFrameworkTests { @@ -348,7 +353,7 @@ class FlowFrameworkTests { //We should update this test when we do the work to persists the flow result. @Test(timeout = 300_000) - fun `Flow status is set to completed in database when the flow finishes and serialised flow state is null`() { + fun `Checkpoint and all its related records are deleted when the flow finishes`() { val terminationSignal = Semaphore(0) val flow = aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal)) mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point @@ -362,12 +367,15 @@ class FlowFrameworkTests { mockNet.waitQuiescent() aliceNode.database.transaction { val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id) - assertNull(checkpoint!!.result) - assertNull(checkpoint.serializedFlowState) - assertEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status) + assertNull(checkpoint) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) } } + // Ignoring test since completed flows are not currently keeping their checkpoints in the database + @Ignore @Test(timeout = 300_000) fun `Flow metadata finish time is set in database when the flow finishes`() { val terminationSignal = Semaphore(0) @@ -821,6 +829,12 @@ class FlowFrameworkTests { assertEquals(null, persistedException) } + private inline fun DatabaseTransaction.findRecordsFromDatabase(): List { + val criteria = session.criteriaBuilder.createQuery(T::class.java) + criteria.select(criteria.from(T::class.java)) + return session.createQuery(criteria).resultList + } + //region Helpers private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt index 2825a05b9f..b1246c739b 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt @@ -43,6 +43,7 @@ import net.corda.testing.driver.driver import net.corda.testing.node.User import org.assertj.core.api.Assertions.assertThat import org.junit.Before +import org.junit.Ignore import org.junit.Test import java.time.Instant import java.util.concurrent.CompletableFuture @@ -354,6 +355,8 @@ class FlowMetadataRecordingTest { } } + // Ignoring test since completed flows are not currently keeping their checkpoints in the database + @Ignore @Test(timeout = 300_000) fun `flows have their finish time recorded when completed`() { driver(DriverParameters(startNodesInProcess = true)) {