ENT-5886 Parameterised queries in DBCheckpointStorage (#6768)

Direct SQL was causing issues with the saving of instants not
maintaining their timezones properly.

Changing to parameterised queries fixed this issue.

Also changed other queries to do the same, since it is good practice to
query in this way.
This commit is contained in:
Dan Newton
2020-10-22 13:38:39 +01:00
committed by GitHub
parent d5790a6251
commit 72c359b165

View File

@ -435,9 +435,10 @@ class DBCheckpointStorage(
} else if (checkpoint.status == FlowStatus.FAILED) { } else if (checkpoint.status == FlowStatus.FAILED) {
// We need to update only the 'flowState' to null, and we don't want to update the checkpoint state // We need to update only the 'flowState' to null, and we don't want to update the checkpoint state
// because we want to retain the last clean checkpoint state, therefore just use a query for that update. // because we want to retain the last clean checkpoint state, therefore just use a query for that update.
val sqlQuery = "Update ${NODE_DATABASE_PREFIX}checkpoint_blobs set flow_state = null where flow_id = '$flowId'" currentDBSession()
val query = currentDBSession().createNativeQuery(sqlQuery) .createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoint_blobs set flow_state = null where flow_id = :flow_id")
query.executeUpdate() .setParameter("flow_id", flowId)
.executeUpdate()
null null
} else { } else {
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
@ -491,12 +492,11 @@ class DBCheckpointStorage(
} }
override fun markAllPaused() { override fun markAllPaused() {
val session = currentDBSession() currentDBSession()
val runnableOrdinals = RUNNABLE_CHECKPOINTS.map { "${it.ordinal}" }.joinToString { it } .createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoints set status = :paused_status where status in :runnable_statuses")
val sqlQuery = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${FlowStatus.PAUSED.ordinal} " + .setParameter("paused_status", FlowStatus.PAUSED.ordinal)
"where status in ($runnableOrdinals)" .setParameter("runnable_statuses", RUNNABLE_CHECKPOINTS.map { it.ordinal })
val query = session.createNativeQuery(sqlQuery) .executeUpdate()
query.executeUpdate()
} }
@Suppress("MagicNumber") @Suppress("MagicNumber")
@ -627,8 +627,11 @@ class DBCheckpointStorage(
} }
override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) { override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set compatible = $compatible where flow_id = '${runId.uuid}'" currentDBSession()
currentDBSession().createNativeQuery(update).executeUpdate() .createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoints set compatible = :compatible where flow_id = :flow_id")
.setParameter("compatible", compatible)
.setParameter("flow_id", runId.uuid.toString())
.executeUpdate()
} }
private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint, now: Instant): DBFlowMetadata { private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint, now: Instant): DBFlowMetadata {
@ -687,11 +690,11 @@ class DBCheckpointStorage(
} }
private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) { private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) {
val session = currentDBSession() currentDBSession()
val sqlQuery = "Update ${NODE_DATABASE_PREFIX}flow_metadata set finish_time = '$now' " + .createNativeQuery("Update ${NODE_DATABASE_PREFIX}flow_metadata set finish_time = :finish_time where flow_id = :flow_id")
"where flow_id = '$flowId'" .setParameter("finish_time", now)
val query = session.createNativeQuery(sqlQuery) .setParameter("flow_id", flowId)
query.executeUpdate() .executeUpdate()
} }
private fun InvocationContext.getStartedType(): StartReason { private fun InvocationContext.getStartedType(): StartReason {