CORDA-3612 - Delete flow results/ exceptions once received by rpc client (#6566)

Enhance rpc acknowledgement method (`removeClientId`) to remove checkpoint 
from all checkpoint database tables.

Optimize `CheckpointStorage.removeCheckpoint` to not delete from all checkpoint
tables if not needed. This includes excluding the results (`DBFlowResult`) and 
exceptions (`DBFlowException`) tables.
This commit is contained in:
Kyriakos Tharrouniatis 2020-08-05 12:40:35 +01:00 committed by GitHub
parent 5d42b8847c
commit 5192a9a2dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 18 deletions

View File

@ -32,9 +32,12 @@ interface CheckpointStorage {
/**
* Remove existing checkpoint from the store.
*
* [mayHavePersistentResults] is used for optimization. If set to [false] it will not attempt to delete the database result or the database exception.
* Please note that if there is a doubt on whether a flow could be finished or not [mayHavePersistentResults] should be set to [true].
* @return whether the id matched a checkpoint that was removed.
*/
fun removeCheckpoint(id: StateMachineRunId): Boolean
fun removeCheckpoint(id: StateMachineRunId, mayHavePersistentResults: Boolean = true): Boolean
/**
* Load an existing checkpoint from the store.

View File

@ -483,14 +483,16 @@ class DBCheckpointStorage(
}
@Suppress("MagicNumber")
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
override fun removeCheckpoint(id: StateMachineRunId, mayHavePersistentResults: Boolean): Boolean {
var deletedRows = 0
val flowId = id.uuid.toString()
deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId)
deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId)
deletedRows += deleteRow(DBFlowCheckpoint::class.java, DBFlowCheckpoint::flowId.name, flowId)
deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId)
if (mayHavePersistentResults) {
deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId)
}
deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId)
return deletedRows >= 2
}

View File

@ -58,9 +58,11 @@ sealed class Action {
data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint, val isCheckpointUpdate: Boolean) : Action()
/**
* Remove the checkpoint corresponding to [id].
* Remove the checkpoint corresponding to [id]. [mayHavePersistentResults] denotes that at the time of injecting a [RemoveCheckpoint]
* the flow could have persisted its database result or exception.
* For more information see [CheckpointStorage.removeCheckpoint].
*/
data class RemoveCheckpoint(val id: StateMachineRunId) : Action()
data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action()
/**
* Persist the deduplication facts of [deduplicationHandlers].

View File

@ -151,7 +151,7 @@ internal class ActionExecutorImpl(
@Suspendable
private fun executeRemoveCheckpoint(action: Action.RemoveCheckpoint) {
checkpointStorage.removeCheckpoint(action.id)
checkpointStorage.removeCheckpoint(action.id, action.mayHavePersistentResults)
}
@Suspendable

View File

@ -329,7 +329,7 @@ internal class SingleThreadedStateMachineManager(
// The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting
// the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop.
database.transaction {
checkpointStorage.removeCheckpoint(id)
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
serviceHub.vaultService.softLockRelease(id.uuid)
}
// the same code is NOT done in remove flow when an error occurs
@ -342,7 +342,7 @@ internal class SingleThreadedStateMachineManager(
true
} else {
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
database.transaction { checkpointStorage.removeCheckpoint(id) }
database.transaction { checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) }
}
}
return if (killFlowResult) {
@ -936,17 +936,21 @@ internal class SingleThreadedStateMachineManager(
)
override fun removeClientId(clientId: String): Boolean {
var removed = false
var removedFlowId: StateMachineRunId? = null
innerState.withLock {
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
if (existingStatus != null && existingStatus is FlowWithClientIdStatus.Removed) {
removed = true
clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus ->
if (existingStatus is FlowWithClientIdStatus.Removed) {
removedFlowId = existingStatus.flowId
null
} else { // don't remove
} else {
existingStatus
}
}
}
return removed
removedFlowId?.let {
return database.transaction { checkpointStorage.removeCheckpoint(it, mayHavePersistentResults = true) }
}
return false
}
}

View File

@ -44,7 +44,7 @@ class KilledFlowTransition(
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
actions.add(Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true))
}
actions.addAll(
arrayOf(

View File

@ -296,6 +296,55 @@ class FlowClientIdTests {
Assert.assertEquals(2, counter)
}
@Test(timeout=300_000)
fun `removing a client id result clears resources properly`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
// assert database status before remove
aliceNode.services.database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
}
aliceNode.smm.removeClientId(clientId)
// assert database status after remove
aliceNode.services.database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
}
}
@Test(timeout=300_000)
fun `removing a client id exception clears resources properly`() {
val clientId = UUID.randomUUID().toString()
ResultFlow.hook = { throw IllegalStateException() }
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
}
// assert database status before remove
aliceNode.services.database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
}
aliceNode.smm.removeClientId(clientId)
// assert database status after remove
aliceNode.services.database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
}
}
@Test(timeout=300_000)
fun `flow's client id mapping can only get removed once the flow gets removed`() {
val clientId = UUID.randomUUID().toString()