mirror of
https://github.com/corda/corda.git
synced 2025-05-02 16:53:22 +00:00
CORDA-1840 Smarter checkpoint writing (#3677)
* Explicitly add/update checkpoints rather than calling `addOrUpdate` which will run a `select` statement and then a add or update statement. * Use `currentState.isAnyCheckpointPersisted()` to check for previous checkpoints
This commit is contained in:
parent
333b8d0658
commit
53b2b86d89
@ -11,10 +11,16 @@ import java.util.stream.Stream
|
|||||||
*/
|
*/
|
||||||
interface CheckpointStorage {
|
interface CheckpointStorage {
|
||||||
/**
|
/**
|
||||||
* Add a new checkpoint to the store.
|
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
|
||||||
*/
|
*/
|
||||||
fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
|
fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update an existing checkpoint. Will throw if there is not checkpoint for this id.
|
||||||
|
*/
|
||||||
|
fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove existing checkpoint from the store.
|
* Remove existing checkpoint from the store.
|
||||||
* @return whether the id matched a checkpoint that was removed.
|
* @return whether the id matched a checkpoint that was removed.
|
||||||
|
@ -36,13 +36,22 @@ class DBCheckpointStorage : CheckpointStorage {
|
|||||||
)
|
)
|
||||||
|
|
||||||
override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
|
override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
|
||||||
currentDBSession().saveOrUpdate(DBCheckpoint().apply {
|
currentDBSession().save(DBCheckpoint().apply {
|
||||||
checkpointId = id.uuid.toString()
|
checkpointId = id.uuid.toString()
|
||||||
this.checkpoint = checkpoint.bytes
|
this.checkpoint = checkpoint.bytes
|
||||||
log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" }
|
log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" }
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
|
||||||
|
currentDBSession().update(DBCheckpoint().apply {
|
||||||
|
checkpointId = id.uuid.toString()
|
||||||
|
this.checkpoint = checkpoint.bytes
|
||||||
|
log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
|
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
|
||||||
val session = currentDBSession()
|
val session = currentDBSession()
|
||||||
val criteriaBuilder = session.criteriaBuilder
|
val criteriaBuilder = session.criteriaBuilder
|
||||||
|
@ -39,7 +39,7 @@ sealed class Action {
|
|||||||
/**
|
/**
|
||||||
* Persist the specified [checkpoint].
|
* Persist the specified [checkpoint].
|
||||||
*/
|
*/
|
||||||
data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint) : Action()
|
data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint, val isCheckpointUpdate: Boolean) : Action()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the checkpoint corresponding to [id].
|
* Remove the checkpoint corresponding to [id].
|
||||||
|
@ -97,7 +97,11 @@ class ActionExecutorImpl(
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
private fun executePersistCheckpoint(action: Action.PersistCheckpoint) {
|
private fun executePersistCheckpoint(action: Action.PersistCheckpoint) {
|
||||||
val checkpointBytes = serializeCheckpoint(action.checkpoint)
|
val checkpointBytes = serializeCheckpoint(action.checkpoint)
|
||||||
checkpointStorage.addCheckpoint(action.id, checkpointBytes)
|
if (action.isCheckpointUpdate) {
|
||||||
|
checkpointStorage.updateCheckpoint(action.id, checkpointBytes)
|
||||||
|
} else {
|
||||||
|
checkpointStorage.addCheckpoint(action.id, checkpointBytes)
|
||||||
|
}
|
||||||
checkpointingMeter.mark()
|
checkpointingMeter.mark()
|
||||||
checkpointSizesThisSecond.update(checkpointBytes.size.toLong())
|
checkpointSizesThisSecond.update(checkpointBytes.size.toLong())
|
||||||
var lastUpdateTime = lastBandwidthUpdate.get()
|
var lastUpdateTime = lastBandwidthUpdate.get()
|
||||||
|
@ -162,7 +162,7 @@ class TopLevelTransition(
|
|||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
actions.addAll(arrayOf(
|
actions.addAll(arrayOf(
|
||||||
Action.PersistCheckpoint(context.id, newCheckpoint),
|
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
|
||||||
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
||||||
Action.CommitTransaction,
|
Action.CommitTransaction,
|
||||||
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
|
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
|
||||||
|
@ -78,7 +78,7 @@ class UnstartedFlowTransition(
|
|||||||
private fun TransitionBuilder.createInitialCheckpoint() {
|
private fun TransitionBuilder.createInitialCheckpoint() {
|
||||||
actions.addAll(arrayOf(
|
actions.addAll(arrayOf(
|
||||||
Action.CreateTransaction,
|
Action.CreateTransaction,
|
||||||
Action.PersistCheckpoint(context.id, currentState.checkpoint),
|
Action.PersistCheckpoint(context.id, currentState.checkpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
|
||||||
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
||||||
Action.CommitTransaction,
|
Action.CommitTransaction,
|
||||||
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers)
|
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user