Fix Redis pipelines for Redis Cluster.

redis::pipe() can't be used with the ClusterClient struct, instead we
must use cluster_pipe() to start the pipeline. This implements a wrapper
which constructs the pipeline based on the used Redis setup.
This commit is contained in:
Orne Brocaar 2022-11-30 08:47:25 +00:00
parent c1c89d06f8
commit dda9d3ffac
9 changed files with 115 additions and 28 deletions

View File

@ -476,7 +476,7 @@ async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<h
let mut c = get_redis_conn()?;
let key = redis_key(format!("backend:async:{}", transaction_id));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -490,7 +490,7 @@ async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<h
.arg(&key)
.arg(30_i64)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
Ok(())
}

View File

@ -33,7 +33,7 @@ pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<
// per device stream
if conf.monitoring.per_device_event_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:event", dev_eui));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -47,7 +47,7 @@ pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<
.arg(&key)
.arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
// global device stream

View File

@ -49,7 +49,7 @@ pub async fn log_uplink_for_gateways(ufl: &api::UplinkFrameLog) -> Result<()> {
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", gateway_id));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -63,7 +63,7 @@ pub async fn log_uplink_for_gateways(ufl: &api::UplinkFrameLog) -> Result<()> {
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
// global gateway stream
@ -102,7 +102,7 @@ pub async fn log_downlink_for_gateway(dfl: &api::DownlinkFrameLog) -> Result<()>
// per gateway stream
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", dfl.gateway_id));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -116,7 +116,7 @@ pub async fn log_downlink_for_gateway(dfl: &api::DownlinkFrameLog) -> Result<()>
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
// global gateway stream
@ -154,7 +154,7 @@ pub async fn log_uplink_for_device(ufl: &api::UplinkFrameLog) -> Result<()> {
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", ufl.dev_eui));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -168,7 +168,7 @@ pub async fn log_uplink_for_device(ufl: &api::UplinkFrameLog) -> Result<()> {
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
// global device stream
@ -206,7 +206,7 @@ pub async fn log_downlink_for_device(dfl: &api::DownlinkFrameLog) -> Result<()>
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", dfl.dev_eui));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
@ -220,7 +220,7 @@ pub async fn log_downlink_for_device(dfl: &api::DownlinkFrameLog) -> Result<()>
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
// global device stream

View File

@ -34,7 +34,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
let mut c = get_redis_conn()?;
// Atomic add and pexpire.
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&addr_key)
@ -44,7 +44,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
.arg(&addr_key)
.arg(ttl)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
// In case there is a pending rejoin session, make sure that the new
// DevAddr also resolves to the device-session.
@ -52,7 +52,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
let pending_addr = DevAddr::from_slice(&pending_ds.dev_addr)?;
let pending_addr_key = redis_key(format!("devaddr:{{{}}}", pending_addr));
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&pending_addr_key)
@ -62,7 +62,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
.arg(&pending_addr_key)
.arg(ttl)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
}
redis::cmd("PSETEX")

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use anyhow::{Context, Result};
use anyhow::Result;
use chrono::{DateTime, Datelike, Duration as ChronoDuration, Local, TimeZone, Timelike};
use serde::{Deserialize, Serialize};
use tokio::task;
@ -124,7 +124,7 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
move || -> Result<()> {
let mut c = get_redis_conn()?;
let key = get_key(&name, a, ts);
let mut pipe = redis::pipe();
let mut pipe = c.new_pipeline();
pipe.atomic();
for (k, v) in &record.metrics {
@ -154,8 +154,7 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query(&mut *c)
.context("Execute metrics pipeline")?;
.query(&mut c)?;
Ok(())
}
@ -252,13 +251,13 @@ pub async fn get(
let keys = keys.clone();
move || -> Result<Vec<Record>> {
let mut c = get_redis_conn()?;
let mut pipe = redis::pipe();
let mut pipe = c.new_pipeline();
for k in &keys {
pipe.cmd("HGETALL").arg(k);
}
let res: Vec<HashMap<String, f64>> = pipe.query(&mut *c)?;
let res: Vec<HashMap<String, f64>> = pipe.query(&mut c)?;
let mut out: Vec<Record> = Vec::new();
for (i, r) in res.iter().enumerate() {

View File

@ -52,6 +52,17 @@ pub enum RedisPoolConnection {
ClusterClient(PooledConnection<redis::cluster::ClusterClient>),
}
impl RedisPoolConnection {
pub fn new_pipeline(&self) -> RedisPipeline {
match self {
RedisPoolConnection::Client(_) => RedisPipeline::Pipeline(redis::pipe()),
RedisPoolConnection::ClusterClient(_) => {
RedisPipeline::ClusterPipeline(redis::cluster::cluster_pipe())
}
}
}
}
impl Deref for RedisPoolConnection {
type Target = dyn redis::ConnectionLike;
@ -74,6 +85,84 @@ impl DerefMut for RedisPoolConnection {
}
}
pub enum RedisPipeline {
Pipeline(redis::Pipeline),
ClusterPipeline(redis::cluster::ClusterPipeline),
}
impl RedisPipeline {
pub fn cmd(&mut self, name: &str) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.cmd(name);
}
RedisPipeline::ClusterPipeline(p) => {
p.cmd(name);
}
}
self
}
pub fn arg<T: redis::ToRedisArgs>(&mut self, arg: T) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.arg(arg);
}
RedisPipeline::ClusterPipeline(p) => {
p.arg(arg);
}
}
self
}
pub fn ignore(&mut self) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.ignore();
}
RedisPipeline::ClusterPipeline(p) => {
p.ignore();
}
}
self
}
pub fn atomic(&mut self) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.atomic();
}
RedisPipeline::ClusterPipeline(_) => {
// TODO: ClusterPipeline does not (yet?) provide .atomic() method.
// https://github.com/redis-rs/redis-rs/issues/731
}
}
self
}
pub fn query<T: redis::FromRedisValue>(
&mut self,
con: &mut RedisPoolConnection,
) -> redis::RedisResult<T> {
match self {
RedisPipeline::Pipeline(p) => {
if let RedisPoolConnection::Client(c) = con {
p.query(&mut **c)
} else {
panic!("Mismatch between RedisPipeline and RedisPoolConnection")
}
}
RedisPipeline::ClusterPipeline(p) => {
if let RedisPoolConnection::ClusterClient(c) = con {
p.query(&mut **c)
} else {
panic!("Mismatch between RedisPipeline and RedisPoolConnection")
}
}
}
}
}
pub async fn setup() -> Result<()> {
let conf = config::get();

View File

@ -61,7 +61,7 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
// * We need to be able to lookup the session using the DevAddr (potentially
// using the MIC validation).
// * We need to be able to stop a passive-roaming session given a DevEUI.
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&dev_addr_key)
@ -84,7 +84,7 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
.arg(pr_ttl)
.arg(b)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
Ok(())
}

View File

@ -166,7 +166,7 @@ async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> R
move || -> Result<()> {
let mut c = get_redis_conn()?;
redis::pipe()
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&key)
@ -176,7 +176,7 @@ async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> R
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query(&mut *c)?;
.query(&mut c)?;
Ok(())
}

View File

@ -134,8 +134,7 @@ impl Stats {
&format!("gw:{}", self.gateway.as_ref().unwrap().gateway_id),
&m,
)
.await
.context("Save metrics")?;
.await?;
Ok(())
}