mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-02-21 01:21:21 +00:00
Immediately return Redis conn after use.
Before the Redis connection would be returned once variable c went out of scope, in most cases at the return of the function. This would mean that during the execution of the remaining code within the function, the Redis connection would be reserved. With this change, the Redis connection is immediately returned to the pool after usage.
This commit is contained in:
parent
f57aa32f23
commit
fccf762c39
@ -524,7 +524,7 @@ async fn _handle_xmit_data_req(
|
||||
|
||||
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<hyper::Body>> {
|
||||
let transaction_id = bp.transaction_id;
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let key = redis_key(format!("backend:async:{}", transaction_id));
|
||||
|
||||
redis::pipe()
|
||||
@ -541,7 +541,7 @@ async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<h
|
||||
.arg(&key)
|
||||
.arg(30_i64)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
Ok(warp::reply().into_response())
|
||||
|
@ -134,13 +134,12 @@ async fn get_client() -> Result<CoreClient> {
|
||||
async fn store_nonce(state: &CsrfToken, nonce: &Nonce) -> Result<()> {
|
||||
trace!("Storing nonce");
|
||||
let key = redis_key(format!("auth:oidc:{}", state.secret()));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
redis::cmd("PSETEX")
|
||||
.arg(key)
|
||||
.arg(Duration::minutes(5).num_milliseconds())
|
||||
.arg(nonce.secret())
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@ -149,11 +148,10 @@ async fn store_nonce(state: &CsrfToken, nonce: &Nonce) -> Result<()> {
|
||||
async fn get_nonce(state: &CsrfToken) -> Result<Nonce> {
|
||||
trace!("Getting nonce");
|
||||
let key = redis_key(format!("auth:oidc:{}", state.secret()));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let v: String = redis::cmd("GET")
|
||||
.arg(&key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get nonce")?;
|
||||
|
||||
|
@ -19,11 +19,10 @@ pub async fn get_geoloc_buffer(
|
||||
|
||||
trace!(dev_eui = %dev_eui, "Getting geolocation buffer");
|
||||
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let b: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get geolocation buffer")?;
|
||||
if b.is_empty() {
|
||||
@ -77,7 +76,6 @@ pub async fn save_geoloc_buffer(
|
||||
|
||||
trace!(dev_eui = %dev_eui, "Saving geolocation buffer");
|
||||
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let buffer = internal::LoraCloudGeolocBuffer {
|
||||
uplinks: items
|
||||
@ -92,7 +90,7 @@ pub async fn save_geoloc_buffer(
|
||||
.arg(key)
|
||||
.arg(ttl.num_milliseconds())
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %dev_eui, "Geolocation buffer saved");
|
||||
|
@ -238,14 +238,13 @@ pub mod test {
|
||||
}
|
||||
|
||||
async fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String {
|
||||
let mut c = get_async_redis_conn().await.unwrap();
|
||||
let srr: StreamReadReply = redis::cmd("XREAD")
|
||||
.arg("COUNT")
|
||||
.arg(1 as usize)
|
||||
.arg("STREAMS")
|
||||
.arg("device:stream:event")
|
||||
.arg(&last_id)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(1, srr.keys.len());
|
||||
|
@ -15,13 +15,12 @@ pub async fn save_rx_info(rx_info: &internal::DeviceGatewayRxInfo) -> Result<()>
|
||||
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
|
||||
let ttl = conf.network.device_session_ttl.as_millis() as usize;
|
||||
let b = rx_info.encode_to_vec();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
redis::cmd("PSETEX")
|
||||
.arg(key)
|
||||
.arg(ttl)
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %dev_eui, "Gateway rx-info saved");
|
||||
@ -29,12 +28,11 @@ pub async fn save_rx_info(rx_info: &internal::DeviceGatewayRxInfo) -> Result<()>
|
||||
}
|
||||
|
||||
pub async fn get_rx_info(dev_eui: &EUI64) -> Result<internal::DeviceGatewayRxInfo, Error> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
|
||||
|
||||
let b: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get rx-info")?;
|
||||
if b.is_empty() {
|
||||
@ -51,7 +49,6 @@ pub async fn get_rx_info_for_dev_euis(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let mut keys: Vec<String> = Vec::new();
|
||||
for dev_eui in dev_euis {
|
||||
keys.push(redis_key(format!("device:{{{}}}:gwrx", dev_eui)));
|
||||
@ -59,7 +56,7 @@ pub async fn get_rx_info_for_dev_euis(
|
||||
|
||||
let bb: Vec<Vec<u8>> = redis::cmd("MGET")
|
||||
.arg(keys)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("MGET")?;
|
||||
let mut out: Vec<internal::DeviceGatewayRxInfo> = Vec::new();
|
||||
|
@ -28,7 +28,6 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
|
||||
let ds_key = redis_key(format!("device:{{{}}}:ds", eui));
|
||||
let b = ds.encode_to_vec();
|
||||
let ttl = conf.network.device_session_ttl.as_millis() as usize;
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
// Atomic add and pexpire.
|
||||
redis::pipe()
|
||||
@ -41,7 +40,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
|
||||
.arg(&addr_key)
|
||||
.arg(ttl)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
// In case there is a pending rejoin session, make sure that the new
|
||||
@ -60,7 +59,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
|
||||
.arg(&pending_addr_key)
|
||||
.arg(ttl)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -68,7 +67,7 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
|
||||
.arg(ds_key)
|
||||
.arg(ttl)
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %eui, dev_addr = %addr, "Device-session saved");
|
||||
@ -77,10 +76,10 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
|
||||
|
||||
pub async fn get(dev_eui: &EUI64) -> Result<chirpstack_api::internal::DeviceSession, Error> {
|
||||
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let v: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get device-session")?;
|
||||
if v.is_empty() {
|
||||
@ -93,8 +92,11 @@ pub async fn get(dev_eui: &EUI64) -> Result<chirpstack_api::internal::DeviceSess
|
||||
|
||||
pub async fn delete(dev_eui: &EUI64) -> Result<()> {
|
||||
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
redis::cmd("DEL").arg(&key).query_async(&mut c).await?;
|
||||
|
||||
redis::cmd("DEL")
|
||||
.arg(&key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %dev_eui, "Device-session deleted");
|
||||
Ok(())
|
||||
@ -186,7 +188,7 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up(
|
||||
// Make sure that in case of concurrent calls for the same uplink only one will
|
||||
// pass. Either the concurrent call would read the incremented uplink frame-counter
|
||||
// or it is unable to aquire the lock.
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let lock_key = redis_key(format!(
|
||||
"device:{{{}}}:ds:lock:{}",
|
||||
hex::encode(&ds.dev_eui),
|
||||
@ -198,7 +200,7 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up(
|
||||
.arg("EX")
|
||||
.arg(1_usize)
|
||||
.arg("NX")
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
if !set {
|
||||
@ -295,10 +297,10 @@ pub async fn get_for_phypayload(
|
||||
|
||||
async fn get_dev_euis_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<EUI64>> {
|
||||
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let dev_euis: HashSet<Vec<u8>> = redis::cmd("SMEMBERS")
|
||||
.arg(key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get DevEUIs for DevAddr")?;
|
||||
|
||||
@ -311,11 +313,11 @@ async fn get_dev_euis_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<EUI64>> {
|
||||
|
||||
async fn remove_dev_eui_from_dev_addr_set(dev_addr: DevAddr, dev_eui: EUI64) -> Result<()> {
|
||||
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
redis::cmd("SREM")
|
||||
.arg(key)
|
||||
.arg(&dev_eui.to_be_bytes())
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
@ -10,12 +10,12 @@ use chirpstack_api::internal;
|
||||
pub async fn save(df: &internal::DownlinkFrame) -> Result<()> {
|
||||
let b = df.encode_to_vec();
|
||||
let key = redis_key(format!("frame:{}", df.downlink_id));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
redis::cmd("SETEX")
|
||||
.arg(key)
|
||||
.arg(30)
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(downlink_id = df.downlink_id, "Downlink-frame saved");
|
||||
@ -23,9 +23,11 @@ pub async fn save(df: &internal::DownlinkFrame) -> Result<()> {
|
||||
}
|
||||
|
||||
pub async fn get(id: u32) -> Result<internal::DownlinkFrame, Error> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let key = redis_key(format!("frame:{}", id));
|
||||
let v: Vec<u8> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
|
||||
let v: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
if v.is_empty() {
|
||||
return Err(Error::NotFound(format!("{}", id)));
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ use lrwn::EUI64;
|
||||
|
||||
pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommandSet) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
|
||||
let ttl = conf.network.device_session_ttl.as_millis() as usize;
|
||||
@ -17,7 +16,7 @@ pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommand
|
||||
.arg(key)
|
||||
.arg(ttl)
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block set");
|
||||
@ -25,9 +24,11 @@ pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommand
|
||||
}
|
||||
|
||||
pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<Option<lrwn::MACCommandSet>> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
|
||||
let b: Vec<u8> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
|
||||
let b: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
let out = if !b.is_empty() {
|
||||
let mut mac = lrwn::MACCommandSet::from_slice(&b);
|
||||
@ -45,10 +46,12 @@ pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<Option<lrwn:
|
||||
}
|
||||
|
||||
pub async fn delete_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<()> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
|
||||
|
||||
redis::cmd("DEL").arg(key).query_async(&mut c).await?;
|
||||
redis::cmd("DEL")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block deleted");
|
||||
Ok(())
|
||||
|
@ -71,12 +71,11 @@ pub async fn save_state(name: &str, state: &str) -> Result<()> {
|
||||
let key = redis_key(format!("metrics:{{{}}}", name));
|
||||
let ttl = get_ttl(Aggregation::MONTH);
|
||||
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
redis::cmd("PSETEX")
|
||||
.arg(key)
|
||||
.arg(ttl.as_millis() as usize)
|
||||
.arg(state)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(state = %state, "State saved");
|
||||
@ -124,7 +123,6 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
|
||||
.unwrap(),
|
||||
};
|
||||
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let key = get_key(&name, a, ts);
|
||||
let mut pipe = redis::pipe();
|
||||
pipe.atomic();
|
||||
@ -156,7 +154,7 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
|
||||
.arg(&key)
|
||||
.arg(ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(name = %name, aggregation = %a, "Metrics saved");
|
||||
@ -165,8 +163,11 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
|
||||
|
||||
pub async fn get_state(name: &str) -> Result<String> {
|
||||
let key = redis_key(format!("metrics:{{{}}}", name));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let v: Option<String> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
|
||||
|
||||
let v: Option<String> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
Ok(v.unwrap_or_default())
|
||||
}
|
||||
|
||||
@ -258,14 +259,14 @@ pub async fn get(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let mut pipe = redis::pipe();
|
||||
|
||||
for k in &keys {
|
||||
pipe.cmd("HGETALL").arg(k);
|
||||
}
|
||||
|
||||
let res: Vec<HashMap<String, f64>> = pipe.query_async(&mut c).await?;
|
||||
let res: Vec<HashMap<String, f64>> =
|
||||
pipe.query_async(&mut get_async_redis_conn().await?).await?;
|
||||
let mut out: Vec<Record> = Vec::new();
|
||||
|
||||
for (i, r) in res.iter().enumerate() {
|
||||
|
@ -250,8 +250,9 @@ pub async fn reset_db() -> Result<()> {
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn reset_redis() -> Result<()> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
redis::cmd("FLUSHDB").query_async(&mut c).await?;
|
||||
redis::cmd("FLUSHDB")
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -45,8 +45,6 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
|
||||
let ttl = conf.network.device_session_ttl.as_millis() as usize;
|
||||
let pr_ttl = lifetime.num_milliseconds() as usize;
|
||||
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
// We need to store a pointer from both the DevAddr and DevEUI to the
|
||||
// passive-roaming device-session ID. This is needed:
|
||||
// * Because the DevAddr is not guaranteed to be unique
|
||||
@ -80,7 +78,7 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
|
||||
.arg(pr_ttl)
|
||||
.arg(b)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(id = %sess_id, "Passive-roaming device-session saved");
|
||||
@ -90,10 +88,10 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
|
||||
|
||||
pub async fn get(id: Uuid) -> Result<internal::PassiveRoamingDeviceSession, Error> {
|
||||
let key = redis_key(format!("pr:sess:{{{}}}", id));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let v: Vec<u8> = redis::cmd("GET")
|
||||
.arg(key)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Get passive-roaming device-session")?;
|
||||
if v.is_empty() {
|
||||
@ -106,8 +104,11 @@ pub async fn get(id: Uuid) -> Result<internal::PassiveRoamingDeviceSession, Erro
|
||||
|
||||
pub async fn delete(id: Uuid) -> Result<()> {
|
||||
let key = redis_key(format!("pr:sess:{{{}}}", id));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
redis::cmd("DEL").arg(&key).query_async(&mut c).await?;
|
||||
|
||||
redis::cmd("DEL")
|
||||
.arg(&key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
info!(id = %id, "Passive-roaming device-session deleted");
|
||||
Ok(())
|
||||
@ -181,8 +182,11 @@ async fn get_sessions_for_dev_addr(
|
||||
|
||||
async fn get_session_ids_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<Uuid>> {
|
||||
let key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?;
|
||||
|
||||
let v: Vec<String> = redis::cmd("SMEMBERS")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
let mut out: Vec<Uuid> = Vec::new();
|
||||
for id in &v {
|
||||
@ -194,8 +198,11 @@ async fn get_session_ids_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<Uuid>> {
|
||||
|
||||
pub async fn get_session_ids_for_dev_eui(dev_eui: EUI64) -> Result<Vec<Uuid>> {
|
||||
let key = redis_key(format!("pr:dev:{{{}}}", dev_eui));
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?;
|
||||
|
||||
let v: Vec<String> = redis::cmd("SMEMBERS")
|
||||
.arg(key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
let mut out: Vec<Uuid> = Vec::new();
|
||||
for id in &v {
|
||||
|
@ -7,7 +7,6 @@ use chirpstack_api::stream;
|
||||
|
||||
pub async fn log_request(pl: &stream::ApiRequestLog) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
if conf.monitoring.api_request_log_max_history == 0 {
|
||||
return Ok(());
|
||||
@ -22,7 +21,7 @@ pub async fn log_request(pl: &stream::ApiRequestLog) -> Result<()> {
|
||||
.arg("*")
|
||||
.arg("request")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@ -49,7 +48,6 @@ mod tests {
|
||||
};
|
||||
log_request(&pl).await.unwrap();
|
||||
|
||||
let mut c = get_async_redis_conn().await.unwrap();
|
||||
let key = redis_key("api:stream:request".to_string());
|
||||
let srr: StreamReadReply = redis::cmd("XREAD")
|
||||
.arg("COUNT")
|
||||
@ -57,7 +55,7 @@ mod tests {
|
||||
.arg("STREAMS")
|
||||
.arg(&key)
|
||||
.arg("0")
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -30,7 +30,6 @@ pub async fn get_log_sender() -> Option<Sender<stream::BackendInterfacesRequest>
|
||||
|
||||
pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
if conf.monitoring.backend_interfaces_log_max_history == 0 {
|
||||
return Ok(());
|
||||
@ -45,7 +44,7 @@ pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> {
|
||||
.arg("*")
|
||||
.arg("request")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
@ -17,7 +17,6 @@ use chirpstack_api::{api, integration};
|
||||
|
||||
pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
// per device stream
|
||||
if conf.monitoring.per_device_event_log_max_history > 0 {
|
||||
@ -36,7 +35,7 @@ pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<
|
||||
.arg(&key)
|
||||
.arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -50,7 +49,7 @@ pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<
|
||||
.arg("*")
|
||||
.arg(typ)
|
||||
.arg(b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -63,7 +62,6 @@ pub async fn get_event_logs(
|
||||
channel: mpsc::Sender<api::LogItem>,
|
||||
) -> Result<()> {
|
||||
let mut last_id = "0".to_string();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
loop {
|
||||
if channel.is_closed() {
|
||||
@ -77,7 +75,7 @@ pub async fn get_event_logs(
|
||||
.arg("STREAMS")
|
||||
.arg(&key)
|
||||
.arg(&last_id)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("XREAD event stream")?;
|
||||
|
||||
|
@ -19,7 +19,6 @@ use chirpstack_api::{api, stream};
|
||||
|
||||
pub async fn log_uplink_for_gateways(ufl: &stream::UplinkFrameLog) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
for rx_info in &ufl.rx_info {
|
||||
let gateway_id = EUI64::from_str(&rx_info.gateway_id)?;
|
||||
@ -56,7 +55,7 @@ pub async fn log_uplink_for_gateways(ufl: &stream::UplinkFrameLog) -> Result<()>
|
||||
.arg(&key)
|
||||
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -70,7 +69,7 @@ pub async fn log_uplink_for_gateways(ufl: &stream::UplinkFrameLog) -> Result<()>
|
||||
.arg("*")
|
||||
.arg("up")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
@ -84,7 +83,7 @@ pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<
|
||||
}
|
||||
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let b = dfl.encode_to_vec();
|
||||
|
||||
// per gateway stream
|
||||
@ -104,7 +103,7 @@ pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<
|
||||
.arg(&key)
|
||||
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -118,7 +117,7 @@ pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<
|
||||
.arg("*")
|
||||
.arg("down")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -131,7 +130,7 @@ pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> {
|
||||
}
|
||||
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let b = ufl.encode_to_vec();
|
||||
|
||||
// per device stream
|
||||
@ -152,7 +151,7 @@ pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> {
|
||||
.arg(&key)
|
||||
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -166,7 +165,7 @@ pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> {
|
||||
.arg("*")
|
||||
.arg("up")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -179,7 +178,7 @@ pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<(
|
||||
}
|
||||
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let b = dfl.encode_to_vec();
|
||||
|
||||
// per device stream
|
||||
@ -200,7 +199,7 @@ pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<(
|
||||
.arg(&key)
|
||||
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -214,7 +213,7 @@ pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<(
|
||||
.arg("*")
|
||||
.arg("down")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -227,7 +226,6 @@ pub async fn get_frame_logs(
|
||||
channel: mpsc::Sender<api::LogItem>,
|
||||
) -> Result<()> {
|
||||
let mut last_id = "0".to_string();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
loop {
|
||||
if channel.is_closed() {
|
||||
@ -241,7 +239,7 @@ pub async fn get_frame_logs(
|
||||
.arg("STREAMS")
|
||||
.arg(&key)
|
||||
.arg(&last_id)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("XREAD frame stream")?;
|
||||
|
||||
|
@ -7,7 +7,6 @@ use chirpstack_api::stream;
|
||||
|
||||
pub async fn log_uplink(up: &stream::UplinkMeta) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
if conf.monitoring.meta_log_max_history > 0 {
|
||||
let key = redis_key("stream:meta".to_string());
|
||||
@ -19,7 +18,7 @@ pub async fn log_uplink(up: &stream::UplinkMeta) -> Result<()> {
|
||||
.arg("*")
|
||||
.arg("up")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -28,7 +27,6 @@ pub async fn log_uplink(up: &stream::UplinkMeta) -> Result<()> {
|
||||
|
||||
pub async fn log_downlink(down: &stream::DownlinkMeta) -> Result<()> {
|
||||
let conf = config::get();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
if conf.monitoring.meta_log_max_history > 0 {
|
||||
let key = redis_key("stream:meta".to_string());
|
||||
@ -41,7 +39,7 @@ pub async fn log_downlink(down: &stream::DownlinkMeta) -> Result<()> {
|
||||
.arg("*")
|
||||
.arg("down")
|
||||
.arg(&b)
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -397,7 +397,6 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator {
|
||||
Box::new(move || {
|
||||
let um = um.clone();
|
||||
Box::pin(async move {
|
||||
let mut c = get_async_redis_conn().await.unwrap();
|
||||
let key = redis_key("stream:meta".to_string());
|
||||
let srr: StreamReadReply = redis::cmd("XREAD")
|
||||
.arg("COUNT")
|
||||
@ -405,7 +404,7 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator {
|
||||
.arg("STREAMS")
|
||||
.arg(&key)
|
||||
.arg("0")
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@ -434,7 +433,6 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator {
|
||||
Box::new(move || {
|
||||
let uf = uf.clone();
|
||||
Box::pin(async move {
|
||||
let mut c = get_async_redis_conn().await.unwrap();
|
||||
let key = redis_key(format!("device:{{{}}}:stream:frame", uf.dev_eui));
|
||||
let srr: StreamReadReply = redis::cmd("XREAD")
|
||||
.arg("COUNT")
|
||||
@ -442,7 +440,7 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator {
|
||||
.arg("STREAMS")
|
||||
.arg(&key)
|
||||
.arg("0")
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -221,7 +221,6 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
|
||||
|
||||
async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> Result<()> {
|
||||
let event_b = event.encode_to_vec();
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
redis::pipe()
|
||||
.atomic()
|
||||
@ -233,7 +232,7 @@ async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> R
|
||||
.arg(key)
|
||||
.arg(ttl.as_millis() as usize)
|
||||
.ignore()
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Deduplication put")?;
|
||||
|
||||
@ -241,15 +240,13 @@ async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> R
|
||||
}
|
||||
|
||||
async fn deduplicate_locked(key: &str, ttl: Duration) -> Result<bool> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
|
||||
let set: bool = redis::cmd("SET")
|
||||
.arg(key)
|
||||
.arg("lock")
|
||||
.arg("PX")
|
||||
.arg(ttl.as_millis() as usize)
|
||||
.arg("NX")
|
||||
.query_async(&mut c)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Deduplication locked")?;
|
||||
|
||||
@ -257,12 +254,13 @@ async fn deduplicate_locked(key: &str, ttl: Duration) -> Result<bool> {
|
||||
}
|
||||
|
||||
async fn deduplicate_collect(key: &str) -> Result<gw::UplinkFrameSet> {
|
||||
let mut c = get_async_redis_conn().await?;
|
||||
let items_b: Vec<Vec<u8>> = redis::cmd("SMEMBERS")
|
||||
.arg(&key)
|
||||
.query_async(&mut c)
|
||||
.await
|
||||
.context("Deduplication collect")?;
|
||||
let items_b: Vec<Vec<u8>> = {
|
||||
redis::cmd("SMEMBERS")
|
||||
.arg(&key)
|
||||
.query_async(&mut get_async_redis_conn().await?)
|
||||
.await
|
||||
.context("Deduplication collect")?
|
||||
};
|
||||
|
||||
if items_b.is_empty() {
|
||||
return Err(anyhow!("Zero items in collect set"));
|
||||
|
Loading…
x
Reference in New Issue
Block a user