Save metrics in one pipelined Redis query.

This saves the metrics in one pipelined Redis query, rather than one
query per aggregation level. This saves 2 queries per save.
This commit is contained in:
Orne Brocaar 2024-01-31 12:11:54 +00:00
parent b65faf7b98
commit 8cde64c4b7

View File

@ -83,81 +83,77 @@ pub async fn save_state(name: &str, state: &str) -> Result<()> {
} }
pub async fn save(name: &str, record: &Record) -> Result<()> { pub async fn save(name: &str, record: &Record) -> Result<()> {
for a in get_aggregations() {
save_for_interval(a, name, record).await?;
}
Ok(())
}
async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Result<()> {
if record.metrics.is_empty() { if record.metrics.is_empty() {
return Ok(()); return Ok(());
} }
let ttl = get_ttl(a);
let ts: DateTime<Local> = match a {
Aggregation::HOUR => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
record.time.hour(),
0,
0,
)
.unwrap(),
Aggregation::DAY => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
0,
0,
0,
)
.unwrap(),
Aggregation::MONTH => Local
.with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0)
.unwrap(),
};
let key = get_key(&name, a, ts);
let mut pipe = redis::pipe(); let mut pipe = redis::pipe();
pipe.atomic(); pipe.atomic();
for (k, v) in &record.metrics { for a in get_aggregations() {
// Passing a reference to hincr will return a runtime error. let ttl = get_ttl(a);
let k = k.clone();
let v = *v;
match record.kind { let ts: DateTime<Local> = match a {
Kind::COUNTER => { Aggregation::HOUR => Local
pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore(); .with_ymd_and_hms(
} record.time.year(),
Kind::ABSOLUTE => { record.time.month(),
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); record.time.day(),
} record.time.hour(),
Kind::GAUGE => { 0,
pipe.cmd("HINCRBYFLOAT") 0,
.arg(&key) )
.arg(format!("_{}_count", k)) .unwrap(),
.arg(1.0) Aggregation::DAY => Local
.ignore(); .with_ymd_and_hms(
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); record.time.year(),
record.time.month(),
record.time.day(),
0,
0,
0,
)
.unwrap(),
Aggregation::MONTH => Local
.with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0)
.unwrap(),
};
let key = get_key(&name, a, ts);
for (k, v) in &record.metrics {
// Passing a reference to hincr will return a runtime error.
let k = k.clone();
let v = *v;
match record.kind {
Kind::COUNTER => {
pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore();
}
Kind::ABSOLUTE => {
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
Kind::GAUGE => {
pipe.cmd("HINCRBYFLOAT")
.arg(&key)
.arg(format!("_{}_count", k))
.arg(1.0)
.ignore();
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
} }
} }
pipe.cmd("PEXPIRE")
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore();
info!(name = %name, aggregation = %a, "Metrics saved");
} }
pipe.cmd("PEXPIRE") pipe.query_async(&mut get_async_redis_conn().await?).await?;
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query_async(&mut get_async_redis_conn().await?)
.await?;
info!(name = %name, aggregation = %a, "Metrics saved");
Ok(()) Ok(())
} }
@ -339,9 +335,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::HOUR, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -408,9 +402,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::DAY, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -477,9 +469,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::DAY, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -546,9 +536,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::MONTH, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -607,9 +595,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::HOUR, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -658,9 +644,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::HOUR, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(
@ -709,9 +693,7 @@ pub mod test {
}, },
]; ];
for r in &records { for r in &records {
save_for_interval(Aggregation::HOUR, "test", r) save("test", r).await.unwrap();
.await
.unwrap();
} }
let resp = get( let resp = get(