Convert Local to NaiveDateTime before calculating intervals.

This makes it a lot easier to iterate over the intervals, as we no
longer have to take into account DST changes that could either result in
an invalid or ambiguous date, or not incrementing by the expected
interval. E.g. incrementing by 1 day resulting in a 23 hour increment
because or DST change.

On returning the metrics, we try to convert the NaiveDateTime back into
a DateTime<Local>, failing that, we skip it rather than failing on it.

Closes #415.
This commit is contained in:
Orne Brocaar 2024-05-03 12:25:51 +01:00
parent 5f6ccc35fb
commit 3ec9ee2031

View File

@ -3,7 +3,10 @@ use std::fmt;
use std::time::Duration; use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use chrono::{DateTime, Datelike, Duration as ChronoDuration, Local, TimeZone, Timelike}; use chrono::{
DateTime, Datelike, Duration as ChronoDuration, Local, Months, NaiveDate, NaiveDateTime,
Timelike,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::info; use tracing::info;
@ -62,7 +65,7 @@ fn get_ttl(a: Aggregation) -> Duration {
} }
} }
fn get_key(name: &str, a: Aggregation, dt: DateTime<Local>) -> String { fn get_key(name: &str, a: Aggregation, dt: NaiveDateTime) -> String {
redis_key(format!( redis_key(format!(
"metrics:{{{}}}:{}:{}", "metrics:{{{}}}:{}:{}",
name, name,
@ -97,40 +100,31 @@ pub async fn save(name: &str, record: &Record, aggregations: &[Aggregation]) ->
for a in aggregations { for a in aggregations {
let ttl = get_ttl(*a); let ttl = get_ttl(*a);
let ts: DateTime<Local> = match a { let ts: NaiveDateTime = match a {
Aggregation::MINUTE => Local Aggregation::MINUTE => {
.with_ymd_and_hms( NaiveDate::from_ymd_opt(record.time.year(), record.time.month(), record.time.day())
record.time.year(), .ok_or_else(|| anyhow!("Invalid date"))?
record.time.month(), .and_hms_opt(record.time.hour(), record.time.minute(), 0)
record.time.day(), .ok_or_else(|| anyhow!("Invalid time"))?
record.time.hour(), }
record.time.minute(), Aggregation::HOUR => {
0, NaiveDate::from_ymd_opt(record.time.year(), record.time.month(), record.time.day())
) .ok_or_else(|| anyhow!("Invalid date"))?
.unwrap(), .and_hms_opt(record.time.hour(), 0, 0)
Aggregation::HOUR => Local .ok_or_else(|| anyhow!("Invalid time"))?
.with_ymd_and_hms( }
record.time.year(), Aggregation::DAY => {
record.time.month(), NaiveDate::from_ymd_opt(record.time.year(), record.time.month(), record.time.day())
record.time.day(), .ok_or_else(|| anyhow!("Invalid date"))?
record.time.hour(), .and_hms_opt(0, 0, 0)
0, .ok_or_else(|| anyhow!("Invalid time"))?
0, }
) Aggregation::MONTH => {
.unwrap(), NaiveDate::from_ymd_opt(record.time.year(), record.time.month(), 1)
Aggregation::DAY => Local .ok_or_else(|| anyhow!("Invalid date"))?
.with_ymd_and_hms( .and_hms_opt(0, 0, 0)
record.time.year(), .ok_or_else(|| anyhow!("Invalid time"))?
record.time.month(), }
record.time.day(),
0,
0,
0,
)
.unwrap(),
Aggregation::MONTH => Local
.with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0)
.unwrap(),
}; };
let key = get_key(name, *a, ts); let key = get_key(name, *a, ts);
@ -189,30 +183,18 @@ pub async fn get(
end: DateTime<Local>, end: DateTime<Local>,
) -> Result<Vec<Record>> { ) -> Result<Vec<Record>> {
let mut keys: Vec<String> = Vec::new(); let mut keys: Vec<String> = Vec::new();
let mut timestamps: Vec<DateTime<Local>> = Vec::new(); let mut timestamps: Vec<NaiveDateTime> = Vec::new();
match a { match a {
Aggregation::MINUTE => { Aggregation::MINUTE => {
let mut ts = Local let mut ts = NaiveDate::from_ymd_opt(start.year(), start.month(), start.day())
.with_ymd_and_hms( .ok_or_else(|| anyhow!("Invalid date"))?
start.year(), .and_hms_opt(start.hour(), start.minute(), 0)
start.month(), .ok_or_else(|| anyhow!("Invalid time"))?;
start.day(), let end = NaiveDate::from_ymd_opt(end.year(), end.month(), end.day())
start.hour(), .ok_or_else(|| anyhow!("Invalid date"))?
start.minute(), .and_hms_opt(end.hour(), end.minute(), 0)
0, .ok_or_else(|| anyhow!("Invalid time"))?;
)
.unwrap();
let end = Local
.with_ymd_and_hms(
end.year(),
end.month(),
end.day(),
end.hour(),
end.minute(),
0,
)
.unwrap();
while ts.le(&end) { while ts.le(&end) {
timestamps.push(ts); timestamps.push(ts);
@ -221,74 +203,53 @@ pub async fn get(
} }
} }
Aggregation::HOUR => { Aggregation::HOUR => {
let mut ts = Local let mut ts = NaiveDate::from_ymd_opt(start.year(), start.month(), start.day())
.with_ymd_and_hms(start.year(), start.month(), start.day(), start.hour(), 0, 0) .ok_or_else(|| anyhow!("Invalid date"))?
.unwrap(); .and_hms_opt(start.hour(), 0, 0)
let end = Local .ok_or_else(|| anyhow!("Invalid time"))?;
.with_ymd_and_hms(end.year(), end.month(), end.day(), end.hour(), 0, 0) let end = NaiveDate::from_ymd_opt(end.year(), end.month(), end.day())
.unwrap(); .ok_or_else(|| anyhow!("Invalid date"))?
.and_hms_opt(end.hour(), 0, 0)
.ok_or_else(|| anyhow!("Invalid time"))?;
while ts.le(&end) { while ts.le(&end) {
timestamps.push(ts); timestamps.push(ts);
keys.push(get_key(name, a, ts)); keys.push(get_key(name, a, ts));
ts += ChronoDuration::try_hours(1).unwrap(); ts += ChronoDuration::hours(1);
} }
} }
Aggregation::DAY => { Aggregation::DAY => {
let mut ts = Local let mut ts = NaiveDate::from_ymd_opt(start.year(), start.month(), start.day())
.with_ymd_and_hms(start.year(), start.month(), start.day(), 0, 0, 0) .ok_or_else(|| anyhow!("Invalid date"))?
.unwrap(); .and_hms_opt(0, 0, 0)
let end = Local .ok_or_else(|| anyhow!("Invalid time"))?;
.with_ymd_and_hms(end.year(), end.month(), end.day(), 0, 0, 0) let end = NaiveDate::from_ymd_opt(end.year(), end.month(), end.day())
.unwrap(); .ok_or_else(|| anyhow!("Invalid date"))?
.and_hms_opt(0, 0, 0)
.ok_or_else(|| anyhow!("Invalid time"))?;
while ts.le(&end) { while ts.le(&end) {
timestamps.push(ts); timestamps.push(ts);
keys.push(get_key(name, a, ts)); keys.push(get_key(name, a, ts));
ts = { ts += ChronoDuration::days(1);
if (ts + ChronoDuration::try_days(1).unwrap()).day() == ts.day() {
// In case of DST to non-DST transition, the ts is incremented with less
// than 24h and we end up with the same day. Therefore we increment by two
// days.
(ts + ChronoDuration::try_days(2).unwrap())
.date_naive()
.and_hms_opt(0, 0, 0)
.unwrap()
.and_local_timezone(Local)
.unwrap()
} else {
// Make sure that the timestamp stays at midnight in case of non-DST to DST
// change.
(ts + ChronoDuration::try_days(1).unwrap())
.date_naive()
.and_hms_opt(0, 0, 0)
.unwrap()
.and_local_timezone(Local)
.unwrap()
}
};
} }
} }
Aggregation::MONTH => { Aggregation::MONTH => {
let mut ts = Local let mut ts = NaiveDate::from_ymd_opt(start.year(), start.month(), 1)
.with_ymd_and_hms(start.year(), start.month(), 1, 0, 0, 0) .ok_or_else(|| anyhow!("Invalid date"))?
.unwrap(); .and_hms_opt(0, 0, 0)
let end = Local .ok_or_else(|| anyhow!("Invalid time"))?;
.with_ymd_and_hms(end.year(), end.month(), 1, 0, 0, 0) let end = NaiveDate::from_ymd_opt(end.year(), end.month(), 1)
.unwrap(); .ok_or_else(|| anyhow!("Invalid date"))?
.and_hms_opt(0, 0, 0)
.ok_or_else(|| anyhow!("Invalid time"))?;
while ts.le(&end) { while ts.le(&end) {
timestamps.push(ts); timestamps.push(ts);
keys.push(get_key(name, a, ts)); keys.push(get_key(name, a, ts));
ts = if ts.month() == 12 { ts = ts
Local .checked_add_months(Months::new(1))
.with_ymd_and_hms(ts.year() + 1, 1, 1, 0, 0, 0) .ok_or_else(|| anyhow!("Add month error"))?;
.unwrap()
} else {
Local
.with_ymd_and_hms(ts.year(), ts.month() + 1, 1, 0, 0, 0)
.unwrap()
};
} }
} }
} }
@ -308,6 +269,11 @@ pub async fn get(
let mut out: Vec<Record> = Vec::new(); let mut out: Vec<Record> = Vec::new();
for (i, r) in res.iter().enumerate() { for (i, r) in res.iter().enumerate() {
let tz = match timestamps[i].and_local_timezone(Local) {
chrono::LocalResult::Single(v) => v,
_ => continue,
};
let mut metrics = r.clone(); let mut metrics = r.clone();
// In case of GAUGE values, the total aggregated value must be divided by the // In case of GAUGE values, the total aggregated value must be divided by the
@ -328,7 +294,7 @@ pub async fn get(
} }
out.push(Record { out.push(Record {
time: timestamps[i], time: tz,
kind, kind,
metrics: metrics metrics: metrics
.iter() .iter()
@ -345,6 +311,7 @@ pub async fn get(
pub mod test { pub mod test {
use super::*; use super::*;
use crate::test; use crate::test;
use chrono::TimeZone;
#[tokio::test] #[tokio::test]
async fn test_minute() { async fn test_minute() {