diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index b1e43c5d74..62214e8f78 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -32,9 +32,12 @@ interface CheckpointStorage { /** * Remove existing checkpoint from the store. + * + * [mayHavePersistentResults] is used for optimization. If set to [false] it will not attempt to delete the database result or the database exception. + * Please note that if there is a doubt on whether a flow could be finished or not [mayHavePersistentResults] should be set to [true]. * @return whether the id matched a checkpoint that was removed. */ - fun removeCheckpoint(id: StateMachineRunId): Boolean + fun removeCheckpoint(id: StateMachineRunId, mayHavePersistentResults: Boolean = true): Boolean /** * Load an existing checkpoint from the store. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 4c4132daa7..8dcdaa54d5 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -483,14 +483,16 @@ class DBCheckpointStorage( } @Suppress("MagicNumber") - override fun removeCheckpoint(id: StateMachineRunId): Boolean { + override fun removeCheckpoint(id: StateMachineRunId, mayHavePersistentResults: Boolean): Boolean { var deletedRows = 0 val flowId = id.uuid.toString() - deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId) - deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) - deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId) - deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId) deletedRows += deleteRow(DBFlowCheckpoint::class.java, DBFlowCheckpoint::flowId.name, flowId) + deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId) + if (mayHavePersistentResults) { + deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId) + deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) + } + deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId) return deletedRows >= 2 } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index 6b17fe0870..85dd050429 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -58,9 +58,11 @@ sealed class Action { data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint, val isCheckpointUpdate: Boolean) : Action() /** - * Remove the checkpoint corresponding to [id]. + * Remove the checkpoint corresponding to [id]. [mayHavePersistentResults] denotes that at the time of injecting a [RemoveCheckpoint] + * the flow could have persisted its database result or exception. + * For more information see [CheckpointStorage.removeCheckpoint]. */ - data class RemoveCheckpoint(val id: StateMachineRunId) : Action() + data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action() /** * Persist the deduplication facts of [deduplicationHandlers]. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 7f31d0e743..ccb9cb2656 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -151,7 +151,7 @@ internal class ActionExecutorImpl( @Suspendable private fun executeRemoveCheckpoint(action: Action.RemoveCheckpoint) { - checkpointStorage.removeCheckpoint(action.id) + checkpointStorage.removeCheckpoint(action.id, action.mayHavePersistentResults) } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index f269f69b7f..ae39cad3bf 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -329,7 +329,7 @@ internal class SingleThreadedStateMachineManager( // The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting // the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop. database.transaction { - checkpointStorage.removeCheckpoint(id) + checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) serviceHub.vaultService.softLockRelease(id.uuid) } // the same code is NOT done in remove flow when an error occurs @@ -342,7 +342,7 @@ internal class SingleThreadedStateMachineManager( true } else { // It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists. - database.transaction { checkpointStorage.removeCheckpoint(id) } + database.transaction { checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) } } } return if (killFlowResult) { @@ -936,17 +936,21 @@ internal class SingleThreadedStateMachineManager( ) override fun removeClientId(clientId: String): Boolean { - var removed = false + var removedFlowId: StateMachineRunId? = null innerState.withLock { - clientIdsToFlowIds.compute(clientId) { _, existingStatus -> - if (existingStatus != null && existingStatus is FlowWithClientIdStatus.Removed) { - removed = true + clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus -> + if (existingStatus is FlowWithClientIdStatus.Removed) { + removedFlowId = existingStatus.flowId null - } else { // don't remove + } else { existingStatus } } } - return removed + + removedFlowId?.let { + return database.transaction { checkpointStorage.removeCheckpoint(it, mayHavePersistentResults = true) } + } + return false } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt index 9c44f5988c..bc059668d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt @@ -44,7 +44,7 @@ class KilledFlowTransition( } // The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow] if (startingState.isAnyCheckpointPersisted) { - actions.add(Action.RemoveCheckpoint(context.id)) + actions.add(Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)) } actions.addAll( arrayOf( diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt index d2a7ffa4bf..efdea60199 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt @@ -296,6 +296,55 @@ class FlowClientIdTests { Assert.assertEquals(2, counter) } + @Test(timeout=300_000) + fun `removing a client id result clears resources properly`() { + val clientId = UUID.randomUUID().toString() + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + // assert database status before remove + aliceNode.services.database.transaction { + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + } + + aliceNode.smm.removeClientId(clientId) + + // assert database status after remove + aliceNode.services.database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + } + } + + @Test(timeout=300_000) + fun `removing a client id exception clears resources properly`() { + val clientId = UUID.randomUUID().toString() + ResultFlow.hook = { throw IllegalStateException() } + assertFailsWith { + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow() + } + // assert database status before remove + aliceNode.services.database.transaction { + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + assertEquals(1, findRecordsFromDatabase().size) + } + + aliceNode.smm.removeClientId(clientId) + + // assert database status after remove + aliceNode.services.database.transaction { + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + assertEquals(0, findRecordsFromDatabase().size) + } + } + @Test(timeout=300_000) fun `flow's client id mapping can only get removed once the flow gets removed`() { val clientId = UUID.randomUUID().toString()