From 345d0d8462de18922f8ae650513da2592b000c85 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Tue, 28 Nov 2023 13:23:15 +0000 Subject: [PATCH] Refactor code to use async redis. --- Cargo.lock | 63 +- backend/src/lib.rs | 34 +- chirpstack/Cargo.toml | 5 +- chirpstack/src/api/backend/mod.rs | 75 +-- chirpstack/src/api/error.rs | 6 - chirpstack/src/api/monitoring/mod.rs | 15 +- chirpstack/src/api/oidc.rs | 52 +- chirpstack/src/backend/joinserver.rs | 6 +- chirpstack/src/backend/mod.rs | 6 +- chirpstack/src/backend/roaming.rs | 35 +- chirpstack/src/cmd/root.rs | 2 +- chirpstack/src/downlink/data.rs | 2 +- chirpstack/src/gateway/backend/mqtt.rs | 27 +- .../src/integration/loracloud/buffer.rs | 140 ++--- chirpstack/src/integration/loracloud/mod.rs | 2 +- chirpstack/src/integration/redis.rs | 25 +- chirpstack/src/storage/device_gateway.rs | 106 ++-- chirpstack/src/storage/device_session.rs | 192 +++--- chirpstack/src/storage/downlink_frame.rs | 45 +- chirpstack/src/storage/mac_command.rs | 79 +-- chirpstack/src/storage/metrics.rs | 236 ++++--- chirpstack/src/storage/mod.rs | 198 ++---- chirpstack/src/storage/passive_roaming.rs | 181 +++--- chirpstack/src/stream/api_request.rs | 48 +- chirpstack/src/stream/backend_interfaces.rs | 66 +- chirpstack/src/stream/event.rs | 530 ++++++++-------- chirpstack/src/stream/frame.rs | 589 +++++++++--------- chirpstack/src/stream/meta.rs | 77 +-- chirpstack/src/test/assert.rs | 12 +- chirpstack/src/test/class_a_pr_test.rs | 17 +- chirpstack/src/test/otaa_js_test.rs | 2 +- chirpstack/src/test/otaa_pr_test.rs | 15 +- chirpstack/src/uplink/data_fns.rs | 4 +- chirpstack/src/uplink/join_fns.rs | 6 +- chirpstack/src/uplink/mod.rs | 129 ++-- 35 files changed, 1367 insertions(+), 1660 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64ce0036..84fbefbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -758,6 +758,8 @@ dependencies = [ "chirpstack_api", "chrono", "clap", + "deadpool-redis", + "deadpool-redis-cluster", "diesel", "diesel-async", "diesel_migrations", @@ -789,7 +791,6 @@ dependencies = [ "prometheus-client", "prost", "prost-types", - "r2d2", "rand", "rand_core", "rdkafka", @@ -1191,11 +1192,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "deadpool-redis" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84930e585871d35b8e06d3e03d03e3a8a4c5dc71afa4376c7cd5f9223e1da1ea" +dependencies = [ + "deadpool", + "redis", +] + +[[package]] +name = "deadpool-redis-cluster" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "857c968579c82072dff24f48969d24fb1daab4970d94bb740d87ebb13bc8c2c4" +dependencies = [ + "deadpool", + "redis", + "redis_cluster_async", +] + [[package]] name = "deadpool-runtime" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +dependencies = [ + "tokio", +] [[package]] name = "der" @@ -3354,17 +3379,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - [[package]] name = "rand" version = "0.8.5" @@ -3461,7 +3475,6 @@ dependencies = [ "log", "percent-encoding", "pin-project-lite", - "r2d2", "rand", "rustls", "rustls-native-certs", @@ -3474,6 +3487,21 @@ dependencies = [ "url", ] +[[package]] +name = "redis_cluster_async" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093073cc58cbe376f3308c530edcda1a49ef980de1c32f3fa63622fc5c6f0fb9" +dependencies = [ + "crc16", + "futures", + "log", + "pin-project-lite", + "rand", + "redis", + "tokio", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -3832,15 +3860,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot", -] - [[package]] name = "scoped-futures" version = "0.1.3" diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 95d50191..b1a22f05 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -2,9 +2,7 @@ extern crate anyhow; use std::fs::File; -use std::future::Future; use std::io::Read; -use std::pin::Pin; use std::time::Duration; use aes_kw::Kek; @@ -13,6 +11,7 @@ use chrono::{DateTime, Utc}; use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE}; use reqwest::{Certificate, Identity}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver; use tracing::{debug, error, info, span, trace, Instrument, Level}; @@ -35,14 +34,6 @@ pub trait BasePayloadResultProvider { fn base_payload(&self) -> &BasePayloadResult; } -pub type RequestLogFn = Box< - dyn Fn( - stream::BackendInterfacesRequest, - ) -> Pin> + Sync + Send>> - + Sync - + Send, ->; - pub struct ClientConfig { pub sender_id: Vec, pub receiver_id: Vec, @@ -63,7 +54,7 @@ pub struct ClientConfig { pub use_target_role_suffix: bool, // Request log function. - pub request_log_fn: Option, + pub request_log_sender: Option>, } impl Default for ClientConfig { @@ -78,7 +69,7 @@ impl Default for ClientConfig { authorization: None, async_timeout: Duration::from_secs(0), use_target_role_suffix: false, - request_log_fn: None, + request_log_sender: None, } } } @@ -345,9 +336,12 @@ impl Client { be_req_log.request_error = format!("{:#}", e); } - if let Some(log_fn) = &self.config.request_log_fn { - if let Err(e) = log_fn(be_req_log).await { - error!(error = %e, "Log request error"); + if let Some(tx) = &self.config.request_log_sender { + // We use try_send here as we don't want to delay the response in case + // there is no channel capacity. This would also log an error, proving + // feedback that there is a channel capacity issue. + if let Err(e) = tx.try_send(be_req_log) { + error!(error = %e, "Sending request-log to stream error"); } } @@ -1452,10 +1446,7 @@ pub mod test { let c = Client::new(ClientConfig { sender_id: vec![1, 2, 3], server: server.url("/"), - request_log_fn: Some(Box::new(move |log| { - let tx = tx.clone(); - Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) }) - })), + request_log_sender: Some(tx), ..Default::default() }) .unwrap(); @@ -1517,10 +1508,7 @@ pub mod test { let c = Client::new(ClientConfig { sender_id: vec![1, 2, 3], server: server.url("/"), - request_log_fn: Some(Box::new(move |log| { - let tx = tx.clone(); - Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) }) - })), + request_log_sender: Some(tx), ..Default::default() }) .unwrap(); diff --git a/chirpstack/Cargo.toml b/chirpstack/Cargo.toml index d6f16e79..2c6f6c58 100644 --- a/chirpstack/Cargo.toml +++ b/chirpstack/Cargo.toml @@ -36,9 +36,10 @@ diesel_migrations = { version = "2.1" } diesel-async = { version = "0.4", features = ["deadpool", "postgres", "async-connection-wrapper"] } tokio-postgres = "0.7" tokio-postgres-rustls = "0.10.0" -r2d2 = "0.8" bigdecimal = "0.4" -redis = { version = "0.23", features = ["r2d2", "cluster", "tls-rustls"] } +redis = { version = "0.23", features = ["cluster", "tls-rustls", "tokio-rustls-comp"] } +deadpool-redis = "0.13" +deadpool-redis-cluster = "0.1" # Logging tracing = "0.1" diff --git a/chirpstack/src/api/backend/mod.rs b/chirpstack/src/api/backend/mod.rs index 56360424..1167c173 100644 --- a/chirpstack/src/api/backend/mod.rs +++ b/chirpstack/src/api/backend/mod.rs @@ -18,7 +18,7 @@ use crate::backend::{joinserver, keywrap, roaming}; use crate::downlink::data_fns; use crate::helpers::errors::PrintFullError; use crate::storage::{ - device_session, error::Error as StorageError, get_redis_conn, passive_roaming, redis_key, + device_session, error::Error as StorageError, get_async_redis_conn, passive_roaming, redis_key, }; use crate::uplink::{ data_sns, error::Error as UplinkError, helpers, join_sns, RoamingMetaData, UplinkFrameSet, @@ -127,7 +127,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec) -> http::Response v, Err(_) => { warn!("Unknown SenderID"); @@ -523,33 +523,26 @@ async fn _handle_xmit_data_req( } async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result> { - task::spawn_blocking({ - let b = b.to_vec(); - let transaction_id = bp.transaction_id; - move || -> Result<()> { - let mut c = get_redis_conn()?; - let key = redis_key(format!("backend:async:{}", transaction_id)); + let transaction_id = bp.transaction_id; + let mut c = get_async_redis_conn().await?; + let key = redis_key(format!("backend:async:{}", transaction_id)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(1_i64) - .arg("*") - .arg("pl") - .arg(&b) - .ignore() - .cmd("EXPIRE") - .arg(&key) - .arg(30_i64) - .ignore() - .query(&mut c)?; - - Ok(()) - } - }) - .await??; + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(1_i64) + .arg("*") + .arg("pl") + .arg(b) + .ignore() + .cmd("EXPIRE") + .arg(&key) + .arg(30_i64) + .ignore() + .query_async(&mut c) + .await?; Ok(warp::reply().into_response()) } @@ -560,11 +553,17 @@ pub async fn get_async_receiver( ) -> Result>> { let (tx, rx) = oneshot::channel(); - task::spawn_blocking(move || -> Result<()> { - let mut c = get_redis_conn()?; + task::spawn(async move { + let mut c = match get_async_redis_conn().await { + Ok(v) => v, + Err(e) => { + error!(error = %e, "Get Redis connection error"); + return; + } + }; let key = redis_key(format!("backend:async:{}", transaction_id)); - let srr: StreamReadReply = redis::cmd("XREAD") + let srr: StreamReadReply = match redis::cmd("XREAD") .arg("BLOCK") .arg(timeout.as_millis() as u64) .arg("COUNT") @@ -572,7 +571,15 @@ pub async fn get_async_receiver( .arg("STREAMS") .arg(&key) .arg("0") - .query(&mut *c)?; + .query_async(&mut c) + .await + { + Ok(v) => v, + Err(e) => { + error!(error = %e, "Read from Redis Stream error"); + return; + } + }; for stream_key in &srr.keys { for stream_id in &stream_key.ids { @@ -581,7 +588,7 @@ pub async fn get_async_receiver( "pl" => { if let redis::Value::Data(b) = v { let _ = tx.send(b.to_vec()); - return Ok(()); + return; } } _ => { @@ -595,8 +602,6 @@ pub async fn get_async_receiver( } } } - - Ok(()) }); Ok(rx) diff --git a/chirpstack/src/api/error.rs b/chirpstack/src/api/error.rs index 3889794a..eb344efe 100644 --- a/chirpstack/src/api/error.rs +++ b/chirpstack/src/api/error.rs @@ -63,12 +63,6 @@ impl ToStatus for uuid::Error { } } -impl ToStatus for r2d2::Error { - fn status(&self) -> Status { - Status::new(Code::Internal, format!("{:#}", self)) - } -} - impl ToStatus for lrwn::Error { fn status(&self) -> Status { Status::new(Code::Internal, format!("{:#}", self)) diff --git a/chirpstack/src/api/monitoring/mod.rs b/chirpstack/src/api/monitoring/mod.rs index 6b1dfe1b..93846a49 100644 --- a/chirpstack/src/api/monitoring/mod.rs +++ b/chirpstack/src/api/monitoring/mod.rs @@ -3,13 +3,12 @@ use std::net::SocketAddr; use anyhow::{Context, Result}; use diesel_async::RunQueryDsl; -use tokio::task; use tracing::info; use warp::{http::Response, http::StatusCode, Filter}; use crate::config; use crate::monitoring::prometheus; -use crate::storage::{get_async_db_conn, get_redis_conn}; +use crate::storage::{get_async_db_conn, get_async_redis_conn}; pub async fn setup() { let conf = config::get(); @@ -56,12 +55,8 @@ async fn _health_handler() -> Result<()> { .await .context("PostgreSQL connection error")?; - task::spawn_blocking(move || -> Result<()> { - let mut r = get_redis_conn()?; - if !r.check_connection() { - return Err(anyhow!("Redis connection error")); - } - Ok(()) - }) - .await? + let mut r = get_async_redis_conn().await?; + let _: String = redis::cmd("PING").query_async(&mut r).await?; + + Ok(()) } diff --git a/chirpstack/src/api/oidc.rs b/chirpstack/src/api/oidc.rs index 012a2089..708317d1 100644 --- a/chirpstack/src/api/oidc.rs +++ b/chirpstack/src/api/oidc.rs @@ -13,13 +13,12 @@ use openidconnect::{ }; use openidconnect::{EmptyAdditionalClaims, UserInfoClaims}; use serde::{Deserialize, Serialize}; -use tokio::task; use tracing::{error, trace}; use warp::{Rejection, Reply}; use crate::config; use crate::helpers::errors::PrintFullError; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; pub type User = UserInfoClaims; @@ -133,41 +132,30 @@ async fn get_client() -> Result { } async fn store_nonce(state: &CsrfToken, nonce: &Nonce) -> Result<()> { - task::spawn_blocking({ - let state = state.clone(); - let nonce = nonce.clone(); - move || -> Result<()> { - trace!("Storing nonce"); - let key = redis_key(format!("auth:oidc:{}", state.secret())); - let mut c = get_redis_conn()?; + trace!("Storing nonce"); + let key = redis_key(format!("auth:oidc:{}", state.secret())); + let mut c = get_async_redis_conn().await?; - redis::cmd("PSETEX") - .arg(key) - .arg(Duration::minutes(5).num_milliseconds()) - .arg(nonce.secret()) - .query(&mut *c)?; + redis::cmd("PSETEX") + .arg(key) + .arg(Duration::minutes(5).num_milliseconds()) + .arg(nonce.secret()) + .query_async(&mut c) + .await?; - Ok(()) - } - }) - .await? + Ok(()) } async fn get_nonce(state: &CsrfToken) -> Result { - task::spawn_blocking({ - let state = state.clone(); - move || -> Result { - trace!("Getting nonce"); - let key = redis_key(format!("auth:oidc:{}", state.secret())); - let mut c = get_redis_conn()?; + trace!("Getting nonce"); + let key = redis_key(format!("auth:oidc:{}", state.secret())); + let mut c = get_async_redis_conn().await?; - let v: String = redis::cmd("GET") - .arg(&key) - .query(&mut *c) - .context("Get nonce")?; + let v: String = redis::cmd("GET") + .arg(&key) + .query_async(&mut c) + .await + .context("Get nonce")?; - Ok(Nonce::new(v)) - } - }) - .await? + Ok(Nonce::new(v)) } diff --git a/chirpstack/src/backend/joinserver.rs b/chirpstack/src/backend/joinserver.rs index e8239770..ae85ead8 100644 --- a/chirpstack/src/backend/joinserver.rs +++ b/chirpstack/src/backend/joinserver.rs @@ -11,7 +11,7 @@ lazy_static! { static ref CLIENTS: RwLock)>> = RwLock::new(vec![]); } -pub fn setup() -> Result<()> { +pub async fn setup() -> Result<()> { info!("Setting up Join Server clients"); let conf = config::get(); @@ -28,9 +28,7 @@ pub fn setup() -> Result<()> { tls_cert: js.tls_cert.clone(), tls_key: js.tls_key.clone(), async_timeout: js.async_timeout, - request_log_fn: Some(Box::new(move |log| { - Box::pin(async move { stream::backend_interfaces::log_request(log).await }) - })), + request_log_sender: stream::backend_interfaces::get_log_sender().await, ..Default::default() })?; diff --git a/chirpstack/src/backend/mod.rs b/chirpstack/src/backend/mod.rs index a950f3f8..c65b418b 100644 --- a/chirpstack/src/backend/mod.rs +++ b/chirpstack/src/backend/mod.rs @@ -4,9 +4,9 @@ pub mod joinserver; pub mod keywrap; pub mod roaming; -pub fn setup() -> Result<()> { - joinserver::setup()?; - roaming::setup()?; +pub async fn setup() -> Result<()> { + joinserver::setup().await?; + roaming::setup().await?; Ok(()) } diff --git a/chirpstack/src/backend/roaming.rs b/chirpstack/src/backend/roaming.rs index f67058d5..1a721735 100644 --- a/chirpstack/src/backend/roaming.rs +++ b/chirpstack/src/backend/roaming.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; use std::io::Cursor; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use anyhow::Result; use chrono::{Duration, DurationRound}; use prost::Message; +use tokio::sync::RwLock; use tracing::{debug, info, span, Level}; use crate::gpstime::ToGpsTime; @@ -18,7 +19,7 @@ lazy_static! { static ref CLIENTS: RwLock>> = RwLock::new(HashMap::new()); } -pub fn setup() -> Result<()> { +pub async fn setup() -> Result<()> { info!("Setting up roaming clients"); let conf = config::get(); @@ -56,26 +57,24 @@ pub fn setup() -> Result<()> { Some(s.authorization_header.clone()) }, async_timeout: s.async_timeout, - request_log_fn: Some(Box::new(move |log| { - Box::pin(async move { stream::backend_interfaces::log_request(log).await }) - })), + request_log_sender: stream::backend_interfaces::get_log_sender().await, })?; - set(&s.net_id, c); + set(&s.net_id, c).await; } Ok(()) } -pub fn set(net_id: &NetID, c: Client) { - let mut clients_w = CLIENTS.write().unwrap(); +pub async fn set(net_id: &NetID, c: Client) { + let mut clients_w = CLIENTS.write().await; clients_w.insert(*net_id, Arc::new(c)); } -pub fn get(net_id: &NetID) -> Result> { - let clients_r = CLIENTS.write().unwrap(); +pub async fn get(net_id: &NetID) -> Result> { + let mut clients_w = CLIENTS.write().await; - if let Some(client) = clients_r.get(net_id) { + if let Some(client) = clients_w.get(net_id) { return Ok(client.clone()); } @@ -106,12 +105,14 @@ pub fn get(net_id: &NetID) -> Result> { Some(conf.roaming.default.authorization_header.clone()) }, async_timeout: conf.roaming.default.async_timeout, - request_log_fn: Some(Box::new(move |log| { - Box::pin(async move { stream::backend_interfaces::log_request(log).await }) - })), + request_log_sender: stream::backend_interfaces::get_log_sender().await, })?; - return Ok(Arc::new(c)); + let c = Arc::new(c); + let c_out = c.clone(); + clients_w.insert(*net_id, c); + + return Ok(c_out); } Err(anyhow!( @@ -330,8 +331,8 @@ pub fn dl_meta_data_to_uplink_rx_info( } #[cfg(test)] -pub fn reset() { - let mut clients_w = CLIENTS.write().unwrap(); +pub async fn reset() { + let mut clients_w = CLIENTS.write().await; *clients_w = HashMap::new(); } diff --git a/chirpstack/src/cmd/root.rs b/chirpstack/src/cmd/root.rs index f5bafb5e..7f4a513e 100644 --- a/chirpstack/src/cmd/root.rs +++ b/chirpstack/src/cmd/root.rs @@ -13,7 +13,7 @@ pub async fn run() -> Result<()> { storage::setup().await?; region::setup()?; - backend::setup()?; + backend::setup().await?; adr::setup().await?; integration::setup().await?; gateway::backend::setup().await?; diff --git a/chirpstack/src/downlink/data.rs b/chirpstack/src/downlink/data.rs index 379a177d..ec3b1813 100644 --- a/chirpstack/src/downlink/data.rs +++ b/chirpstack/src/downlink/data.rs @@ -1044,7 +1044,7 @@ impl Data { let roaming_meta = ufs.roaming_meta_data.as_ref().unwrap(); let net_id = NetID::from_slice(&roaming_meta.base_payload.sender_id)?; - let client = roaming::get(&net_id)?; + let client = roaming::get(&net_id).await?; let mut req = backend::XmitDataReqPayload { phy_payload: self.downlink_frame.items[0].phy_payload.clone(), diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index b7592f8b..2abd4977 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -19,13 +19,12 @@ use prost::Message; use rand::Rng; use serde::Serialize; use tokio::sync::mpsc; -use tokio::task; use tracing::{error, info, trace}; use super::GatewayBackend; use crate::config::GatewayBackendMqtt; use crate::monitoring::prometheus; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use crate::{downlink, uplink}; use lrwn::region::CommonName; @@ -462,22 +461,18 @@ async fn message_callback( } async fn is_locked(key: String) -> Result { - task::spawn_blocking({ - move || -> Result { - let mut c = get_redis_conn()?; + let mut c = get_async_redis_conn().await?; - let set: bool = redis::cmd("SET") - .arg(key) - .arg("lock") - .arg("PX") - .arg(5000) - .arg("NX") - .query(&mut *c)?; + let set: bool = redis::cmd("SET") + .arg(key) + .arg("lock") + .arg("PX") + .arg(5000) + .arg("NX") + .query_async(&mut c) + .await?; - Ok(!set) - } - }) - .await? + Ok(!set) } fn gateway_is_json(gateway_id: &str) -> bool { diff --git a/chirpstack/src/integration/loracloud/buffer.rs b/chirpstack/src/integration/loracloud/buffer.rs index 3be42a3d..6056b963 100644 --- a/chirpstack/src/integration/loracloud/buffer.rs +++ b/chirpstack/src/integration/loracloud/buffer.rs @@ -3,75 +3,67 @@ use std::io::Cursor; use anyhow::{Context, Result}; use chrono::{DateTime, Duration, Utc}; use prost::Message; -use tokio::task; use tracing::{info, trace}; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::{gw, internal}; use lrwn::EUI64; pub async fn get_geoloc_buffer( dev_eui: &EUI64, - ttl: &Duration, + ttl: Duration, ) -> Result>> { - if *ttl == Duration::zero() { + if ttl == Duration::zero() { return Ok(Vec::new()); } - task::spawn_blocking({ - let dev_eui = *dev_eui; - let ttl = *ttl; + trace!(dev_eui = %dev_eui, "Getting geolocation buffer"); + let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui)); + let mut c = get_async_redis_conn().await?; - move || -> Result>> { - trace!(dev_eui = %dev_eui, "Getting geolocation buffer"); - let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui)); - let mut c = get_redis_conn()?; + let b: Vec = redis::cmd("GET") + .arg(key) + .query_async(&mut c) + .await + .context("Get geolocation buffer")?; + if b.is_empty() { + return Ok(Vec::new()); + } - let b: Vec = redis::cmd("GET") - .arg(key) - .query(&mut *c) - .context("Get geolocation buffer")?; - if b.is_empty() { - return Ok(Vec::new()); - } + let buffer = internal::LoraCloudGeolocBuffer::decode(&mut Cursor::new(b)) + .context("Decode geolocation buffer")?; - let buffer = internal::LoraCloudGeolocBuffer::decode(&mut Cursor::new(b)) - .context("Decode geolocation buffer")?; + let mut out: Vec> = Vec::new(); - let mut out: Vec> = Vec::new(); + for uplink in &buffer.uplinks { + let rx_info: Vec = uplink + .rx_info + .iter() + .cloned() + .filter(|rx_info| { + let ts: DateTime = match &rx_info.gw_time { + None => { + return false; + } + Some(v) => match v.clone().try_into() { + Ok(v) => v, + Err(_) => { + return false; + } + }, + }; - for uplink in &buffer.uplinks { - let rx_info: Vec = uplink - .rx_info - .iter() - .cloned() - .filter(|rx_info| { - let ts: DateTime = match &rx_info.gw_time { - None => { - return false; - } - Some(v) => match v.clone().try_into() { - Ok(v) => v, - Err(_) => { - return false; - } - }, - }; + // The interval between now and then must be smaller than the TTL + (ts - Utc::now()) < ttl + }) + .collect(); - // The interval between now and then must be smaller than the TTL - (ts - Utc::now()) < ttl - }) - .collect(); - - if rx_info.len() > 3 { - out.push(rx_info); - } - } - - Ok(out) + if rx_info.len() > 3 { + out.push(rx_info); } - }) - .await? + } + + Ok(out) } pub async fn save_geoloc_buffer( @@ -83,35 +75,27 @@ pub async fn save_geoloc_buffer( return Ok(()); } - task::spawn_blocking({ - let dev_eui = *dev_eui; - let ttl = *ttl; - let items = items.to_vec(); + trace!(dev_eui = %dev_eui, "Saving geolocation buffer"); + let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui)); + let mut c = get_async_redis_conn().await?; - move || -> Result<()> { - trace!(dev_eui = %dev_eui, "Saving geolocation buffer"); - let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui)); - let mut c = get_redis_conn()?; + let buffer = internal::LoraCloudGeolocBuffer { + uplinks: items + .iter() + .cloned() + .map(|rx_info| internal::LoraCloudGeolocBufferUplink { rx_info }) + .collect(), + }; + let b = buffer.encode_to_vec(); - let buffer = internal::LoraCloudGeolocBuffer { - uplinks: items - .iter() - .cloned() - .map(|rx_info| internal::LoraCloudGeolocBufferUplink { rx_info }) - .collect(), - }; - let b = buffer.encode_to_vec(); + redis::cmd("PSETEX") + .arg(key) + .arg(ttl.num_milliseconds()) + .arg(b) + .query_async(&mut c) + .await?; - redis::cmd("PSETEX") - .arg(key) - .arg(ttl.num_milliseconds()) - .arg(b) - .query(&mut *c)?; + info!(dev_eui = %dev_eui, "Geolocation buffer saved"); - info!(dev_eui = %dev_eui, "Geolocation buffer saved"); - - Ok(()) - } - }) - .await? + Ok(()) } diff --git a/chirpstack/src/integration/loracloud/mod.rs b/chirpstack/src/integration/loracloud/mod.rs index 82464ca3..192b719e 100644 --- a/chirpstack/src/integration/loracloud/mod.rs +++ b/chirpstack/src/integration/loracloud/mod.rs @@ -461,7 +461,7 @@ impl Integration { ); let mut buf = vec![pl.rx_info.clone()]; - buf.extend_from_slice(&buffer::get_geoloc_buffer(&dev_eui, &ttl).await?); + buf.extend_from_slice(&buffer::get_geoloc_buffer(&dev_eui, ttl).await?); buf.truncate( (self .config diff --git a/chirpstack/src/integration/redis.rs b/chirpstack/src/integration/redis.rs index d22a5715..51b3d743 100644 --- a/chirpstack/src/integration/redis.rs +++ b/chirpstack/src/integration/redis.rs @@ -128,7 +128,7 @@ impl IntegrationTrait for Integration { #[cfg(test)] pub mod test { use super::*; - use crate::storage::get_redis_conn; + use crate::storage::get_async_redis_conn; use crate::test; use chirpstack_api::integration; use redis::streams::StreamReadReply; @@ -150,7 +150,7 @@ pub mod test { ..Default::default() }; i.uplink_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "up", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "up", &pl.encode_to_vec()).await; // join let pl = integration::JoinEvent { @@ -162,7 +162,7 @@ pub mod test { ..Default::default() }; i.join_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "join", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "join", &pl.encode_to_vec()).await; // ack let pl = integration::AckEvent { @@ -174,7 +174,7 @@ pub mod test { ..Default::default() }; i.ack_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "ack", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "ack", &pl.encode_to_vec()).await; // txack let pl = integration::TxAckEvent { @@ -186,7 +186,7 @@ pub mod test { ..Default::default() }; i.txack_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "txack", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "txack", &pl.encode_to_vec()).await; // log let pl = integration::LogEvent { @@ -198,7 +198,7 @@ pub mod test { ..Default::default() }; i.log_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "log", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "log", &pl.encode_to_vec()).await; // status let pl = integration::StatusEvent { @@ -210,7 +210,7 @@ pub mod test { ..Default::default() }; i.status_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "status", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "status", &pl.encode_to_vec()).await; // location let pl = integration::LocationEvent { @@ -222,7 +222,7 @@ pub mod test { ..Default::default() }; i.location_event(&HashMap::new(), &pl).await.unwrap(); - last_id = assert_reply(&last_id, "location", &pl.encode_to_vec()); + last_id = assert_reply(&last_id, "location", &pl.encode_to_vec()).await; // integration let pl = integration::IntegrationEvent { @@ -234,18 +234,19 @@ pub mod test { ..Default::default() }; i.integration_event(&HashMap::new(), &pl).await.unwrap(); - let _ = assert_reply(&last_id, "integration", &pl.encode_to_vec()); + let _ = assert_reply(&last_id, "integration", &pl.encode_to_vec()).await; } - fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String { - let mut c = get_redis_conn().unwrap(); + async fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String { + let mut c = get_async_redis_conn().await.unwrap(); let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") .arg(1 as usize) .arg("STREAMS") .arg("device:stream:event") .arg(&last_id) - .query(&mut *c) + .query_async(&mut c) + .await .unwrap(); assert_eq!(1, srr.keys.len()); diff --git a/chirpstack/src/storage/device_gateway.rs b/chirpstack/src/storage/device_gateway.rs index 3a7ec079..4a87cdad 100644 --- a/chirpstack/src/storage/device_gateway.rs +++ b/chirpstack/src/storage/device_gateway.rs @@ -2,60 +2,46 @@ use std::io::Cursor; use anyhow::{Context, Result}; use prost::Message; -use tokio::task; use tracing::info; -use super::{error::Error, get_redis_conn, redis_key}; +use super::{error::Error, get_async_redis_conn, redis_key}; use crate::config; use chirpstack_api::internal; use lrwn::EUI64; pub async fn save_rx_info(rx_info: &internal::DeviceGatewayRxInfo) -> Result<()> { let dev_eui = EUI64::from_slice(&rx_info.dev_eui)?; - task::spawn_blocking({ - let rx_info = rx_info.clone(); - move || -> Result<()> { - let conf = config::get(); - let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui)); - let ttl = conf.network.device_session_ttl.as_millis() as usize; - let b = rx_info.encode_to_vec(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui)); + let ttl = conf.network.device_session_ttl.as_millis() as usize; + let b = rx_info.encode_to_vec(); + let mut c = get_async_redis_conn().await?; - redis::cmd("PSETEX") - .arg(key) - .arg(ttl) - .arg(b) - .query(&mut *c)?; - - Ok(()) - } - }) - .await??; + redis::cmd("PSETEX") + .arg(key) + .arg(ttl) + .arg(b) + .query_async(&mut c) + .await?; info!(dev_eui = %dev_eui, "Gateway rx-info saved"); Ok(()) } pub async fn get_rx_info(dev_eui: &EUI64) -> Result { - task::spawn_blocking({ - let dev_eui = *dev_eui; - move || -> Result { - let mut c = get_redis_conn()?; - let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui)); + let mut c = get_async_redis_conn().await?; + let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui)); - let b: Vec = redis::cmd("GET") - .arg(key) - .query(&mut *c) - .context("Get rx-info")?; - if b.is_empty() { - return Err(Error::NotFound(dev_eui.to_string())); - } + let b: Vec = redis::cmd("GET") + .arg(key) + .query_async(&mut c) + .await + .context("Get rx-info")?; + if b.is_empty() { + return Err(Error::NotFound(dev_eui.to_string())); + } - Ok(internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)) - .context("Decode rx-info")?) - } - }) - .await? + Ok(internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)).context("Decode rx-info")?) } pub async fn get_rx_info_for_dev_euis( @@ -65,34 +51,28 @@ pub async fn get_rx_info_for_dev_euis( return Ok(Vec::new()); } - task::spawn_blocking({ - let dev_euis = dev_euis.to_vec(); - move || -> Result, Error> { - let mut c = get_redis_conn()?; - let mut keys: Vec = Vec::new(); - for dev_eui in &dev_euis { - keys.push(redis_key(format!("device:{{{}}}:gwrx", dev_eui))); - } + let mut c = get_async_redis_conn().await?; + let mut keys: Vec = Vec::new(); + for dev_eui in dev_euis { + keys.push(redis_key(format!("device:{{{}}}:gwrx", dev_eui))); + } - let bb: Vec> = redis::cmd("MGET") - .arg(keys) - .query(&mut *c) - .context("MGET")?; - let mut out: Vec = Vec::new(); - for b in bb { - if b.is_empty() { - continue; - } - - out.push( - internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)) - .context("Decode rx-info")?, - ); - } - Ok(out) + let bb: Vec> = redis::cmd("MGET") + .arg(keys) + .query_async(&mut c) + .await + .context("MGET")?; + let mut out: Vec = Vec::new(); + for b in bb { + if b.is_empty() { + continue; } - }) - .await? + + out.push( + internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)).context("Decode rx-info")?, + ); + } + Ok(out) } #[cfg(test)] diff --git a/chirpstack/src/storage/device_session.rs b/chirpstack/src/storage/device_session.rs index b7a36fac..1a389b24 100644 --- a/chirpstack/src/storage/device_session.rs +++ b/chirpstack/src/storage/device_session.rs @@ -3,11 +3,10 @@ use std::io::Cursor; use anyhow::{Context, Result}; use prost::Message; -use tokio::task; use tracing::{error, info, trace}; use super::error::Error; -use super::{get_redis_conn, redis_key}; +use super::{get_async_redis_conn, redis_key}; use crate::api::helpers::FromProto; use crate::config; use crate::helpers::errors::PrintFullError; @@ -24,96 +23,79 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> { let eui = EUI64::from_slice(&ds.dev_eui)?; let addr = DevAddr::from_slice(&ds.dev_addr)?; - task::spawn_blocking({ - let ds = ds.clone(); - move || -> Result<()> { - let conf = config::get(); - let addr_key = redis_key(format!("devaddr:{{{}}}", addr)); - let ds_key = redis_key(format!("device:{{{}}}:ds", eui)); - let b = ds.encode_to_vec(); - let ttl = conf.network.device_session_ttl.as_millis() as usize; - let mut c = get_redis_conn()?; + let conf = config::get(); + let addr_key = redis_key(format!("devaddr:{{{}}}", addr)); + let ds_key = redis_key(format!("device:{{{}}}:ds", eui)); + let b = ds.encode_to_vec(); + let ttl = conf.network.device_session_ttl.as_millis() as usize; + let mut c = get_async_redis_conn().await?; - // Atomic add and pexpire. - c.new_pipeline() - .atomic() - .cmd("SADD") - .arg(&addr_key) - .arg(&eui.to_be_bytes()) - .ignore() - .cmd("PEXPIRE") - .arg(&addr_key) - .arg(ttl) - .ignore() - .query(&mut c)?; + // Atomic add and pexpire. + redis::pipe() + .atomic() + .cmd("SADD") + .arg(&addr_key) + .arg(&eui.to_be_bytes()) + .ignore() + .cmd("PEXPIRE") + .arg(&addr_key) + .arg(ttl) + .ignore() + .query_async(&mut c) + .await?; - // In case there is a pending rejoin session, make sure that the new - // DevAddr also resolves to the device-session. - if let Some(pending_ds) = &ds.pending_rejoin_device_session { - let pending_addr = DevAddr::from_slice(&pending_ds.dev_addr)?; - let pending_addr_key = redis_key(format!("devaddr:{{{}}}", pending_addr)); + // In case there is a pending rejoin session, make sure that the new + // DevAddr also resolves to the device-session. + if let Some(pending_ds) = &ds.pending_rejoin_device_session { + let pending_addr = DevAddr::from_slice(&pending_ds.dev_addr)?; + let pending_addr_key = redis_key(format!("devaddr:{{{}}}", pending_addr)); - c.new_pipeline() - .atomic() - .cmd("SADD") - .arg(&pending_addr_key) - .arg(&eui.to_be_bytes()) - .ignore() - .cmd("PEXPIRE") - .arg(&pending_addr_key) - .arg(ttl) - .ignore() - .query(&mut c)?; - } + redis::pipe() + .atomic() + .cmd("SADD") + .arg(&pending_addr_key) + .arg(&eui.to_be_bytes()) + .ignore() + .cmd("PEXPIRE") + .arg(&pending_addr_key) + .arg(ttl) + .ignore() + .query_async(&mut c) + .await?; + } - redis::cmd("PSETEX") - .arg(ds_key) - .arg(ttl) - .arg(b) - .query(&mut *c)?; - - Ok(()) - } - }) - .await??; + redis::cmd("PSETEX") + .arg(ds_key) + .arg(ttl) + .arg(b) + .query_async(&mut c) + .await?; info!(dev_eui = %eui, dev_addr = %addr, "Device-session saved"); Ok(()) } pub async fn get(dev_eui: &EUI64) -> Result { - task::spawn_blocking({ - let dev_eui = *dev_eui; - move || -> Result { - let key = redis_key(format!("device:{{{}}}:ds", dev_eui)); - let mut c = get_redis_conn()?; - let v: Vec = redis::cmd("GET") - .arg(key) - .query(&mut *c) - .context("Get device-session")?; - if v.is_empty() { - return Err(Error::NotFound(dev_eui.to_string())); - } - let ds = chirpstack_api::internal::DeviceSession::decode(&mut Cursor::new(v)) - .context("Decode device-session")?; - Ok(ds) - } - }) - .await? + let key = redis_key(format!("device:{{{}}}:ds", dev_eui)); + let mut c = get_async_redis_conn().await?; + let v: Vec = redis::cmd("GET") + .arg(key) + .query_async(&mut c) + .await + .context("Get device-session")?; + if v.is_empty() { + return Err(Error::NotFound(dev_eui.to_string())); + } + let ds = chirpstack_api::internal::DeviceSession::decode(&mut Cursor::new(v)) + .context("Decode device-session")?; + Ok(ds) } pub async fn delete(dev_eui: &EUI64) -> Result<()> { - task::spawn_blocking({ - let dev_eui = *dev_eui; - move || -> Result<()> { - let key = redis_key(format!("device:{{{}}}:ds", dev_eui)); - let mut c = get_redis_conn()?; - redis::cmd("DEL").arg(&key).query(&mut *c)?; + let key = redis_key(format!("device:{{{}}}:ds", dev_eui)); + let mut c = get_async_redis_conn().await?; + redis::cmd("DEL").arg(&key).query_async(&mut c).await?; - Ok(()) - } - }) - .await??; info!(dev_eui = %dev_eui, "Device-session deleted"); Ok(()) } @@ -204,7 +186,7 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up( // Make sure that in case of concurrent calls for the same uplink only one will // pass. Either the concurrent call would read the incremented uplink frame-counter // or it is unable to aquire the lock. - let mut c = get_redis_conn()?; + let mut c = get_async_redis_conn().await?; let lock_key = redis_key(format!( "device:{{{}}}:ds:lock:{}", hex::encode(&ds.dev_eui), @@ -216,7 +198,8 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up( .arg("EX") .arg(1_usize) .arg("NX") - .query(&mut *c)?; + .query_async(&mut c) + .await?; if !set { return Ok(ValidationStatus::Retransmission(full_f_cnt, ds)); @@ -311,40 +294,31 @@ pub async fn get_for_phypayload( } async fn get_dev_euis_for_dev_addr(dev_addr: DevAddr) -> Result> { - task::spawn_blocking({ - move || -> Result> { - let key = redis_key(format!("devaddr:{{{}}}", dev_addr)); - let mut c = get_redis_conn()?; - let dev_euis: HashSet> = redis::cmd("SMEMBERS") - .arg(key) - .query(&mut *c) - .context("Get DevEUIs for DevAddr")?; + let key = redis_key(format!("devaddr:{{{}}}", dev_addr)); + let mut c = get_async_redis_conn().await?; + let dev_euis: HashSet> = redis::cmd("SMEMBERS") + .arg(key) + .query_async(&mut c) + .await + .context("Get DevEUIs for DevAddr")?; - let mut out = Vec::new(); - for dev_eui in &dev_euis { - out.push(EUI64::from_slice(dev_eui)?); - } - - Ok(out) - } - }) - .await? + let mut out = Vec::new(); + for dev_eui in &dev_euis { + out.push(EUI64::from_slice(dev_eui)?); + } + Ok(out) } async fn remove_dev_eui_from_dev_addr_set(dev_addr: DevAddr, dev_eui: EUI64) -> Result<()> { - task::spawn_blocking({ - move || -> Result<()> { - let key = redis_key(format!("devaddr:{{{}}}", dev_addr)); - let mut c = get_redis_conn()?; - redis::cmd("SREM") - .arg(key) - .arg(&dev_eui.to_be_bytes()) - .query(&mut *c)?; + let key = redis_key(format!("devaddr:{{{}}}", dev_addr)); + let mut c = get_async_redis_conn().await?; + redis::cmd("SREM") + .arg(key) + .arg(&dev_eui.to_be_bytes()) + .query_async(&mut c) + .await?; - Ok(()) - } - }) - .await? + Ok(()) } async fn get_for_dev_addr(dev_addr: DevAddr) -> Result> { diff --git a/chirpstack/src/storage/downlink_frame.rs b/chirpstack/src/storage/downlink_frame.rs index 577fc25a..0085f6ce 100644 --- a/chirpstack/src/storage/downlink_frame.rs +++ b/chirpstack/src/storage/downlink_frame.rs @@ -2,42 +2,35 @@ use std::io::Cursor; use anyhow::Result; use prost::Message; -use tokio::task; use tracing::info; -use super::{error::Error, get_redis_conn, redis_key}; +use super::{error::Error, get_async_redis_conn, redis_key}; use chirpstack_api::internal; pub async fn save(df: &internal::DownlinkFrame) -> Result<()> { - task::spawn_blocking({ - let df = df.clone(); - move || -> Result<()> { - let b = df.encode_to_vec(); - let key = redis_key(format!("frame:{}", df.downlink_id)); - let mut c = get_redis_conn()?; - redis::cmd("SETEX").arg(key).arg(30).arg(b).query(&mut *c)?; - Ok(()) - } - }) - .await??; + let b = df.encode_to_vec(); + let key = redis_key(format!("frame:{}", df.downlink_id)); + let mut c = get_async_redis_conn().await?; + redis::cmd("SETEX") + .arg(key) + .arg(30) + .arg(b) + .query_async(&mut c) + .await?; + info!(downlink_id = df.downlink_id, "Downlink-frame saved"); Ok(()) } pub async fn get(id: u32) -> Result { - task::spawn_blocking({ - move || -> Result { - let mut c = get_redis_conn()?; - let key = redis_key(format!("frame:{}", id)); - let v: Vec = redis::cmd("GET").arg(key).query(&mut *c)?; - if v.is_empty() { - return Err(Error::NotFound(format!("{}", id))); - } - let df = internal::DownlinkFrame::decode(&mut Cursor::new(v))?; - Ok(df) - } - }) - .await? + let mut c = get_async_redis_conn().await?; + let key = redis_key(format!("frame:{}", id)); + let v: Vec = redis::cmd("GET").arg(key).query_async(&mut c).await?; + if v.is_empty() { + return Err(Error::NotFound(format!("{}", id))); + } + let df = internal::DownlinkFrame::decode(&mut Cursor::new(v))?; + Ok(df) } #[cfg(test)] diff --git a/chirpstack/src/storage/mac_command.rs b/chirpstack/src/storage/mac_command.rs index 1ec74482..080edb54 100644 --- a/chirpstack/src/storage/mac_command.rs +++ b/chirpstack/src/storage/mac_command.rs @@ -1,74 +1,55 @@ use anyhow::Result; -use tokio::task; use tracing::info; -use super::{get_redis_conn, redis_key}; +use super::{get_async_redis_conn, redis_key}; use crate::config; use lrwn::EUI64; pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommandSet) -> Result<()> { - task::spawn_blocking({ - let dev_eui = *dev_eui; - let set = set.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); - let ttl = conf.network.device_session_ttl.as_millis() as usize; - let b = set.to_vec()?; + let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); + let ttl = conf.network.device_session_ttl.as_millis() as usize; + let b = set.to_vec()?; + + redis::cmd("PSETEX") + .arg(key) + .arg(ttl) + .arg(b) + .query_async(&mut c) + .await?; - redis::cmd("PSETEX") - .arg(key) - .arg(ttl) - .arg(b) - .query(&mut *c)?; - Ok(()) - } - }) - .await??; info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block set"); Ok(()) } pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result> { - task::spawn_blocking({ - let dev_eui = *dev_eui; - move || -> Result> { - let mut c = get_redis_conn()?; - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); - let b: Vec = redis::cmd("GET").arg(key).query(&mut *c)?; + let mut c = get_async_redis_conn().await?; + let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); + let b: Vec = redis::cmd("GET").arg(key).query_async(&mut c).await?; - let out = if !b.is_empty() { - let mut mac = lrwn::MACCommandSet::from_slice(&b); + let out = if !b.is_empty() { + let mut mac = lrwn::MACCommandSet::from_slice(&b); - // Per definition, the uplink flag is set to false as this function is intended to retrieve - // pending mac-commands that were previously sent to the device. - mac.decode_from_raw(false)?; + // Per definition, the uplink flag is set to false as this function is intended to retrieve + // pending mac-commands that were previously sent to the device. + mac.decode_from_raw(false)?; - Some(mac) - } else { - None - }; + Some(mac) + } else { + None + }; - Ok(out) - } - }) - .await? + Ok(out) } pub async fn delete_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<()> { - task::spawn_blocking({ - let dev_eui = *dev_eui; - move || -> Result<()> { - let mut c = get_redis_conn()?; - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); + let mut c = get_async_redis_conn().await?; + let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); + + redis::cmd("DEL").arg(key).query_async(&mut c).await?; - redis::cmd("DEL").arg(key).query(&mut *c)?; - Ok(()) - } - }) - .await??; info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block deleted"); Ok(()) } diff --git a/chirpstack/src/storage/metrics.rs b/chirpstack/src/storage/metrics.rs index 8d85fba5..61a7840f 100644 --- a/chirpstack/src/storage/metrics.rs +++ b/chirpstack/src/storage/metrics.rs @@ -5,10 +5,9 @@ use std::time::Duration; use anyhow::Result; use chrono::{DateTime, Datelike, Duration as ChronoDuration, Local, TimeZone, Timelike}; use serde::{Deserialize, Serialize}; -use tokio::task; use tracing::info; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; #[allow(clippy::upper_case_acronyms)] #[allow(non_camel_case_types)] @@ -69,23 +68,16 @@ fn get_key(name: &str, a: Aggregation, dt: DateTime) -> String { } pub async fn save_state(name: &str, state: &str) -> Result<()> { - task::spawn_blocking({ - let key = redis_key(format!("metrics:{{{}}}", name)); - let state = state.to_string(); - let ttl = get_ttl(Aggregation::MONTH); + let key = redis_key(format!("metrics:{{{}}}", name)); + let ttl = get_ttl(Aggregation::MONTH); - move || -> Result<()> { - let mut c = get_redis_conn()?; - redis::cmd("PSETEX") - .arg(key) - .arg(ttl.as_millis() as usize) - .arg(state) - .query(&mut *c)?; - - Ok(()) - } - }) - .await??; + let mut c = get_async_redis_conn().await?; + redis::cmd("PSETEX") + .arg(key) + .arg(ttl.as_millis() as usize) + .arg(state) + .query_async(&mut c) + .await?; info!(state = %state, "State saved"); Ok(()) @@ -104,90 +96,78 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul return Ok(()); } - task::spawn_blocking({ - let name = name.to_string(); - let record = record.clone(); - let ttl = get_ttl(a); + let ttl = get_ttl(a); - let ts: DateTime = match a { - Aggregation::HOUR => Local - .with_ymd_and_hms( - record.time.year(), - record.time.month(), - record.time.day(), - record.time.hour(), - 0, - 0, - ) - .unwrap(), - Aggregation::DAY => Local - .with_ymd_and_hms( - record.time.year(), - record.time.month(), - record.time.day(), - 0, - 0, - 0, - ) - .unwrap(), - Aggregation::MONTH => Local - .with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0) - .unwrap(), - }; + let ts: DateTime = match a { + Aggregation::HOUR => Local + .with_ymd_and_hms( + record.time.year(), + record.time.month(), + record.time.day(), + record.time.hour(), + 0, + 0, + ) + .unwrap(), + Aggregation::DAY => Local + .with_ymd_and_hms( + record.time.year(), + record.time.month(), + record.time.day(), + 0, + 0, + 0, + ) + .unwrap(), + Aggregation::MONTH => Local + .with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0) + .unwrap(), + }; - move || -> Result<()> { - let mut c = get_redis_conn()?; - let key = get_key(&name, a, ts); - let mut pipe = c.new_pipeline(); - pipe.atomic(); + let mut c = get_async_redis_conn().await?; + let key = get_key(&name, a, ts); + let mut pipe = redis::pipe(); + pipe.atomic(); - for (k, v) in &record.metrics { - // Passing a reference to hincr will return a runtime error. - let k = k.clone(); - let v = *v; + for (k, v) in &record.metrics { + // Passing a reference to hincr will return a runtime error. + let k = k.clone(); + let v = *v; - match record.kind { - Kind::COUNTER => { - pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore(); - } - Kind::ABSOLUTE => { - pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); - } - Kind::GAUGE => { - pipe.cmd("HINCRBYFLOAT") - .arg(&key) - .arg(format!("_{}_count", k)) - .arg(1.0) - .ignore(); - pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); - } - } + match record.kind { + Kind::COUNTER => { + pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore(); + } + Kind::ABSOLUTE => { + pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); + } + Kind::GAUGE => { + pipe.cmd("HINCRBYFLOAT") + .arg(&key) + .arg(format!("_{}_count", k)) + .arg(1.0) + .ignore(); + pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore(); } - - pipe.cmd("PEXPIRE") - .arg(&key) - .arg(ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - - Ok(()) } - }) - .await??; + } + + pipe.cmd("PEXPIRE") + .arg(&key) + .arg(ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; + info!(name = %name, aggregation = %a, "Metrics saved"); Ok(()) } pub async fn get_state(name: &str) -> Result { - task::spawn_blocking({ - let key = redis_key(format!("metrics:{{{}}}", name)); - move || -> Result { - let mut c = get_redis_conn()?; - let v: Option = redis::cmd("GET").arg(key).query(&mut *c)?; - Ok(v.unwrap_or_default()) - } - }) - .await? + let key = redis_key(format!("metrics:{{{}}}", name)); + let mut c = get_async_redis_conn().await?; + let v: Option = redis::cmd("GET").arg(key).query_async(&mut c).await?; + Ok(v.unwrap_or_default()) } pub async fn get( @@ -278,54 +258,48 @@ pub async fn get( return Ok(Vec::new()); } - task::spawn_blocking({ - let keys = keys.clone(); - move || -> Result> { - let mut c = get_redis_conn()?; - let mut pipe = c.new_pipeline(); + let mut c = get_async_redis_conn().await?; + let mut pipe = redis::pipe(); - for k in &keys { - pipe.cmd("HGETALL").arg(k); - } + for k in &keys { + pipe.cmd("HGETALL").arg(k); + } - let res: Vec> = pipe.query(&mut c)?; - let mut out: Vec = Vec::new(); + let res: Vec> = pipe.query_async(&mut c).await?; + let mut out: Vec = Vec::new(); - for (i, r) in res.iter().enumerate() { - let mut metrics = r.clone(); + for (i, r) in res.iter().enumerate() { + let mut metrics = r.clone(); - // In case of GAUGE values, the total aggregated value must be divided by the - // number of measurements. - if kind == Kind::GAUGE { - let counts: HashMap = r - .iter() - .filter(|(k, _)| k.starts_with('_') && k.ends_with("_count")) - .map(|(k, v)| (k.to_string(), *v)) - .collect(); + // In case of GAUGE values, the total aggregated value must be divided by the + // number of measurements. + if kind == Kind::GAUGE { + let counts: HashMap = r + .iter() + .filter(|(k, _)| k.starts_with('_') && k.ends_with("_count")) + .map(|(k, v)| (k.to_string(), *v)) + .collect(); - for (k, count) in counts { - let k = k.strip_prefix('_').unwrap().strip_suffix("_count").unwrap(); - if let Some(v) = metrics.get_mut(k) { - *v /= count; - } - } + for (k, count) in counts { + let k = k.strip_prefix('_').unwrap().strip_suffix("_count").unwrap(); + if let Some(v) = metrics.get_mut(k) { + *v /= count; } - - out.push(Record { - time: timestamps[i], - kind, - metrics: metrics - .iter() - .filter(|(k, _)| !k.starts_with('_')) - .map(|(k, v)| (k.to_string(), *v)) - .collect(), - }); } - - Ok(out) } - }) - .await? + + out.push(Record { + time: timestamps[i], + kind, + metrics: metrics + .iter() + .filter(|(k, _)| !k.starts_with('_')) + .map(|(k, v)| (k.to_string(), *v)) + .collect(), + }); + } + + Ok(out) } #[cfg(test)] diff --git a/chirpstack/src/storage/mod.rs b/chirpstack/src/storage/mod.rs index 7ad53521..4ba5f7da 100644 --- a/chirpstack/src/storage/mod.rs +++ b/chirpstack/src/storage/mod.rs @@ -1,6 +1,5 @@ use std::fs::File; use std::io::BufReader; -use std::ops::{Deref, DerefMut}; use std::sync::RwLock; use anyhow::Context; @@ -13,7 +12,8 @@ use diesel_async::AsyncPgConnection; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::future::BoxFuture; use futures_util::FutureExt; -use r2d2::{Pool, PooledConnection}; +use redis::aio::ConnectionLike; +use tokio::sync::RwLock as TokioRwLock; use tokio::task; use tracing::{error, info}; @@ -48,129 +48,48 @@ pub type AsyncPgPoolConnection = DeadpoolObject; lazy_static! { static ref ASYNC_PG_POOL: RwLock> = RwLock::new(None); - static ref REDIS_POOL: RwLock> = RwLock::new(None); + static ref ASYNC_REDIS_POOL: TokioRwLock> = TokioRwLock::new(None); static ref REDIS_PREFIX: RwLock = RwLock::new("".to_string()); } pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); -pub enum RedisPool { - Client(Pool), - ClusterClient(Pool), +#[derive(Clone)] +pub enum AsyncRedisPool { + Client(deadpool_redis::Pool), + ClusterClient(deadpool_redis_cluster::Pool), } -pub enum RedisPoolConnection { - Client(PooledConnection), - ClusterClient(PooledConnection), +pub enum AsyncRedisPoolConnection { + Client(deadpool_redis::Connection), + ClusterClient(deadpool_redis_cluster::Connection), } -impl RedisPoolConnection { - pub fn new_pipeline(&self) -> RedisPipeline { +impl ConnectionLike for AsyncRedisPoolConnection { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { match self { - RedisPoolConnection::Client(_) => RedisPipeline::Pipeline(redis::pipe()), - RedisPoolConnection::ClusterClient(_) => { - RedisPipeline::ClusterPipeline(redis::cluster::cluster_pipe()) - } + AsyncRedisPoolConnection::Client(v) => v.req_packed_command(cmd), + AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_command(cmd), } } -} - -impl Deref for RedisPoolConnection { - type Target = dyn redis::ConnectionLike; - - fn deref(&self) -> &Self::Target { + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { match self { - RedisPoolConnection::Client(v) => v.deref() as &dyn redis::ConnectionLike, - RedisPoolConnection::ClusterClient(v) => v.deref() as &dyn redis::ConnectionLike, + AsyncRedisPoolConnection::Client(v) => v.req_packed_commands(cmd, offset, count), + AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_commands(cmd, offset, count), } } -} - -impl DerefMut for RedisPoolConnection { - fn deref_mut(&mut self) -> &mut Self::Target { + fn get_db(&self) -> i64 { match self { - RedisPoolConnection::Client(v) => v.deref_mut() as &mut dyn redis::ConnectionLike, - RedisPoolConnection::ClusterClient(v) => { - v.deref_mut() as &mut dyn redis::ConnectionLike - } - } - } -} - -pub enum RedisPipeline { - Pipeline(redis::Pipeline), - ClusterPipeline(redis::cluster::ClusterPipeline), -} - -impl RedisPipeline { - pub fn cmd(&mut self, name: &str) -> &mut Self { - match self { - RedisPipeline::Pipeline(p) => { - p.cmd(name); - } - RedisPipeline::ClusterPipeline(p) => { - p.cmd(name); - } - } - self - } - - pub fn arg(&mut self, arg: T) -> &mut Self { - match self { - RedisPipeline::Pipeline(p) => { - p.arg(arg); - } - RedisPipeline::ClusterPipeline(p) => { - p.arg(arg); - } - } - self - } - - pub fn ignore(&mut self) -> &mut Self { - match self { - RedisPipeline::Pipeline(p) => { - p.ignore(); - } - RedisPipeline::ClusterPipeline(p) => { - p.ignore(); - } - } - self - } - - pub fn atomic(&mut self) -> &mut Self { - match self { - RedisPipeline::Pipeline(p) => { - p.atomic(); - } - RedisPipeline::ClusterPipeline(_) => { - // TODO: ClusterPipeline does not (yet?) provide .atomic() method. - // https://github.com/redis-rs/redis-rs/issues/731 - } - } - self - } - - pub fn query( - &mut self, - con: &mut RedisPoolConnection, - ) -> redis::RedisResult { - match self { - RedisPipeline::Pipeline(p) => { - if let RedisPoolConnection::Client(c) = con { - p.query(&mut **c) - } else { - panic!("Mismatch between RedisPipeline and RedisPoolConnection") - } - } - RedisPipeline::ClusterPipeline(p) => { - if let RedisPoolConnection::ClusterClient(c) = con { - p.query(c) - } else { - panic!("Mismatch between RedisPipeline and RedisPoolConnection") - } - } + AsyncRedisPoolConnection::Client(v) => v.get_db(), + AsyncRedisPoolConnection::ClusterClient(v) => v.get_db(), } } } @@ -194,29 +113,17 @@ pub async fn setup() -> Result<()> { info!("Setting up Redis client"); if conf.redis.cluster { - let client = redis::cluster::ClusterClientBuilder::new(conf.redis.servers.clone()) - .build() - .context("ClusterClient open")?; - let pool: r2d2::Pool = r2d2::Pool::builder() - .max_size(conf.redis.max_open_connections) - .min_idle(match conf.redis.min_idle_connections { - 0 => None, - _ => Some(conf.redis.min_idle_connections), - }) - .build(client) - .context("Building Redis pool")?; - set_redis_pool(RedisPool::ClusterClient(pool)); + let pool = deadpool_redis_cluster::Config::from_urls(conf.redis.servers.clone()) + .builder()? + .max_size(conf.redis.max_open_connections as usize) + .build()?; + set_async_redis_pool(AsyncRedisPool::ClusterClient(pool)).await; } else { - let client = redis::Client::open(conf.redis.servers[0].clone()).context("Redis client")?; - let pool: r2d2::Pool = r2d2::Pool::builder() - .max_size(conf.redis.max_open_connections) - .min_idle(match conf.redis.min_idle_connections { - 0 => None, - _ => Some(conf.redis.min_idle_connections), - }) - .build(client) - .context("Building Redis pool")?; - set_redis_pool(RedisPool::Client(pool)); + let pool = deadpool_redis::Config::from_url(conf.redis.servers[0].clone()) + .builder()? + .max_size(conf.redis.max_open_connections as usize) + .build()?; + set_async_redis_pool(AsyncRedisPool::Client(pool)).await; } if !conf.redis.key_prefix.is_empty() { @@ -289,14 +196,23 @@ pub async fn get_async_db_conn() -> Result { Ok(pool.get().await?) } -pub fn get_redis_conn() -> Result { - let pool_r = REDIS_POOL.read().unwrap(); - let pool = pool_r +async fn get_async_redis_pool() -> Result { + let pool_r = ASYNC_REDIS_POOL.read().await; + let pool: AsyncRedisPool = pool_r .as_ref() - .ok_or_else(|| anyhow!("Redis connection pool is not initialized (yet)"))?; + .ok_or_else(|| anyhow!("Redis connection pool is not initialized"))? + .clone(); + Ok(pool) +} + +pub async fn get_async_redis_conn() -> Result { + let pool = get_async_redis_pool().await?; + Ok(match pool { - RedisPool::Client(v) => RedisPoolConnection::Client(v.get()?), - RedisPool::ClusterClient(v) => RedisPoolConnection::ClusterClient(v.get()?), + AsyncRedisPool::Client(v) => AsyncRedisPoolConnection::Client(v.get().await?), + AsyncRedisPool::ClusterClient(v) => { + AsyncRedisPoolConnection::ClusterClient(v.clone().get().await?) + } }) } @@ -322,8 +238,8 @@ pub async fn run_db_migrations() -> Result<()> { .await? } -pub fn set_redis_pool(p: RedisPool) { - let mut pool_w = REDIS_POOL.write().unwrap(); +async fn set_async_redis_pool(p: AsyncRedisPool) { + let mut pool_w = ASYNC_REDIS_POOL.write().await; *pool_w = Some(p); } @@ -353,8 +269,8 @@ pub async fn reset_db() -> Result<()> { #[cfg(test)] pub async fn reset_redis() -> Result<()> { - let mut c = get_redis_conn()?; - redis::cmd("FLUSHDB").query(&mut *c)?; + let mut c = get_async_redis_conn().await?; + redis::cmd("FLUSHDB").query_async(&mut c).await?; Ok(()) } diff --git a/chirpstack/src/storage/passive_roaming.rs b/chirpstack/src/storage/passive_roaming.rs index 1522dc76..2d5901ad 100644 --- a/chirpstack/src/storage/passive_roaming.rs +++ b/chirpstack/src/storage/passive_roaming.rs @@ -4,12 +4,11 @@ use std::str::FromStr; use anyhow::{Context, Result}; use chrono::{DateTime, Duration, Utc}; use prost::Message; -use tokio::task; use tracing::{debug, info}; use uuid::Uuid; use super::error::Error; -use super::{get_redis_conn, redis_key}; +use super::{get_async_redis_conn, redis_key}; use crate::config; use chirpstack_api::internal; use lrwn::{AES128Key, DevAddr, EUI64}; @@ -37,59 +36,52 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> { return Ok(()); } - task::spawn_blocking({ - let ds = ds.clone(); - move || -> Result<()> { - let conf = config::get(); + let conf = config::get(); - let dev_addr_key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr)); - let dev_eui_key = redis_key(format!("pr:dev:{{{}}}", dev_eui)); - let sess_key = redis_key(format!("pr:sess:{{{}}}", sess_id)); - let b = ds.encode_to_vec(); - let ttl = conf.network.device_session_ttl.as_millis() as usize; - let pr_ttl = lifetime.num_milliseconds() as usize; + let dev_addr_key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr)); + let dev_eui_key = redis_key(format!("pr:dev:{{{}}}", dev_eui)); + let sess_key = redis_key(format!("pr:sess:{{{}}}", sess_id)); + let b = ds.encode_to_vec(); + let ttl = conf.network.device_session_ttl.as_millis() as usize; + let pr_ttl = lifetime.num_milliseconds() as usize; - let mut c = get_redis_conn()?; + let mut c = get_async_redis_conn().await?; - // We need to store a pointer from both the DevAddr and DevEUI to the - // passive-roaming device-session ID. This is needed: - // * Because the DevAddr is not guaranteed to be unique - // * Because the DevEUI might not be given (thus is also not guaranteed - // to be an unique identifier). - // - // But: - // * We need to be able to lookup the session using the DevAddr (potentially - // using the MIC validation). - // * We need to be able to stop a passive-roaming session given a DevEUI. - c.new_pipeline() - .atomic() - .cmd("SADD") - .arg(&dev_addr_key) - .arg(&sess_id.to_string()) - .ignore() - .cmd("SADD") - .arg(&dev_eui_key) - .arg(&sess_id.to_string()) - .ignore() - .cmd("PEXPIRE") - .arg(&dev_addr_key) - .arg(ttl) - .ignore() - .cmd("PEXPIRE") - .arg(&dev_eui_key) - .arg(ttl) - .ignore() - .cmd("PSETEX") - .arg(&sess_key) - .arg(pr_ttl) - .arg(b) - .ignore() - .query(&mut c)?; - - Ok(()) - } - }) - .await??; + // We need to store a pointer from both the DevAddr and DevEUI to the + // passive-roaming device-session ID. This is needed: + // * Because the DevAddr is not guaranteed to be unique + // * Because the DevEUI might not be given (thus is also not guaranteed + // to be an unique identifier). + // + // But: + // * We need to be able to lookup the session using the DevAddr (potentially + // using the MIC validation). + // * We need to be able to stop a passive-roaming session given a DevEUI. + redis::pipe() + .atomic() + .cmd("SADD") + .arg(&dev_addr_key) + .arg(&sess_id.to_string()) + .ignore() + .cmd("SADD") + .arg(&dev_eui_key) + .arg(&sess_id.to_string()) + .ignore() + .cmd("PEXPIRE") + .arg(&dev_addr_key) + .arg(ttl) + .ignore() + .cmd("PEXPIRE") + .arg(&dev_eui_key) + .arg(ttl) + .ignore() + .cmd("PSETEX") + .arg(&sess_key) + .arg(pr_ttl) + .arg(b) + .ignore() + .query_async(&mut c) + .await?; info!(id = %sess_id, "Passive-roaming device-session saved"); @@ -97,35 +89,26 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> { } pub async fn get(id: Uuid) -> Result { - task::spawn_blocking({ - move || -> Result { - let key = redis_key(format!("pr:sess:{{{}}}", id)); - let mut c = get_redis_conn()?; - let v: Vec = redis::cmd("GET") - .arg(key) - .query(&mut *c) - .context("Get passive-roaming device-session")?; - if v.is_empty() { - return Err(Error::NotFound(id.to_string())); - } - let ds = internal::PassiveRoamingDeviceSession::decode(&mut Cursor::new(v)) - .context("Decode passive-roaming device-session")?; - Ok(ds) - } - }) - .await? + let key = redis_key(format!("pr:sess:{{{}}}", id)); + let mut c = get_async_redis_conn().await?; + let v: Vec = redis::cmd("GET") + .arg(key) + .query_async(&mut c) + .await + .context("Get passive-roaming device-session")?; + if v.is_empty() { + return Err(Error::NotFound(id.to_string())); + } + let ds = internal::PassiveRoamingDeviceSession::decode(&mut Cursor::new(v)) + .context("Decode passive-roaming device-session")?; + Ok(ds) } pub async fn delete(id: Uuid) -> Result<()> { - task::spawn_blocking({ - move || -> Result<()> { - let key = redis_key(format!("pr:sess:{{{}}}", id)); - let mut c = get_redis_conn()?; - redis::cmd("DEL").arg(&key).query(&mut *c)?; - Ok(()) - } - }) - .await??; + let key = redis_key(format!("pr:sess:{{{}}}", id)); + let mut c = get_async_redis_conn().await?; + redis::cmd("DEL").arg(&key).query_async(&mut c).await?; + info!(id = %id, "Passive-roaming device-session deleted"); Ok(()) } @@ -197,39 +180,29 @@ async fn get_sessions_for_dev_addr( } async fn get_session_ids_for_dev_addr(dev_addr: DevAddr) -> Result> { - task::spawn_blocking({ - move || -> Result> { - let key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr)); - let mut c = get_redis_conn()?; - let v: Vec = redis::cmd("SMEMBERS").arg(key).query(&mut *c)?; + let key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr)); + let mut c = get_async_redis_conn().await?; + let v: Vec = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?; - let mut out: Vec = Vec::new(); - for id in &v { - out.push(Uuid::from_str(id)?); - } + let mut out: Vec = Vec::new(); + for id in &v { + out.push(Uuid::from_str(id)?); + } - Ok(out) - } - }) - .await? + Ok(out) } pub async fn get_session_ids_for_dev_eui(dev_eui: EUI64) -> Result> { - task::spawn_blocking({ - move || -> Result> { - let key = redis_key(format!("pr:dev:{{{}}}", dev_eui)); - let mut c = get_redis_conn()?; - let v: Vec = redis::cmd("SMEMBERS").arg(key).query(&mut *c)?; + let key = redis_key(format!("pr:dev:{{{}}}", dev_eui)); + let mut c = get_async_redis_conn().await?; + let v: Vec = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?; - let mut out: Vec = Vec::new(); - for id in &v { - out.push(Uuid::from_str(id)?); - } + let mut out: Vec = Vec::new(); + for id in &v { + out.push(Uuid::from_str(id)?); + } - Ok(out) - } - }) - .await? + Ok(out) } fn get_full_f_cnt_up(next_expected_full_fcnt: u32, truncated_f_cnt: u32) -> u32 { diff --git a/chirpstack/src/stream/api_request.rs b/chirpstack/src/stream/api_request.rs index 39ff7849..ce7dd12b 100644 --- a/chirpstack/src/stream/api_request.rs +++ b/chirpstack/src/stream/api_request.rs @@ -1,38 +1,31 @@ use anyhow::Result; use prost::Message; -use tokio::task; use crate::config; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::stream; pub async fn log_request(pl: &stream::ApiRequestLog) -> Result<()> { - task::spawn_blocking({ - let pl = pl.clone(); + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + if conf.monitoring.api_request_log_max_history == 0 { + return Ok(()); + } - if conf.monitoring.api_request_log_max_history == 0 { - return Ok(()); - } + let key = redis_key("api:stream:request".to_string()); + let b = pl.encode_to_vec(); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.api_request_log_max_history) + .arg("*") + .arg("request") + .arg(&b) + .query_async(&mut c) + .await?; - let key = redis_key("api:stream:request".to_string()); - let b = pl.encode_to_vec(); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.api_request_log_max_history) - .arg("*") - .arg("request") - .arg(&b) - .query(&mut *c)?; - - Ok(()) - } - }) - .await? + Ok(()) } #[cfg(test)] @@ -56,7 +49,7 @@ mod tests { }; log_request(&pl).await.unwrap(); - let mut c = get_redis_conn().unwrap(); + let mut c = get_async_redis_conn().await.unwrap(); let key = redis_key("api:stream:request".to_string()); let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") @@ -64,7 +57,8 @@ mod tests { .arg("STREAMS") .arg(&key) .arg("0") - .query(&mut *c) + .query_async(&mut c) + .await .unwrap(); assert_eq!(1, srr.keys.len()); diff --git a/chirpstack/src/stream/backend_interfaces.rs b/chirpstack/src/stream/backend_interfaces.rs index a6887cf5..73cfdc15 100644 --- a/chirpstack/src/stream/backend_interfaces.rs +++ b/chirpstack/src/stream/backend_interfaces.rs @@ -1,34 +1,52 @@ use anyhow::Result; use prost::Message; -use tokio::task; +use tokio::sync::mpsc::{self, Sender}; +use tracing::error; use crate::config; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::stream; -pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> { - task::spawn_blocking({ - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; +pub async fn get_log_sender() -> Option> { + let conf = config::get(); + if conf.monitoring.backend_interfaces_log_max_history == 0 { + return None; + } - if conf.monitoring.backend_interfaces_log_max_history == 0 { - return Ok(()); - } + let (tx, mut rx) = mpsc::channel(100); - let key = redis_key("backend_interfaces:stream:request".to_string()); - let b = pl.encode_to_vec(); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.backend_interfaces_log_max_history) - .arg("*") - .arg("request") - .arg(&b) - .query(&mut *c)?; - - Ok(()) + tokio::spawn(async move { + while let Some(pl) = rx.recv().await { + tokio::spawn(async move { + if let Err(e) = log_request(pl).await { + error!(error = %e, "Log request error"); + } + }); } - }) - .await? + }); + + Some(tx) +} + +pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> { + let conf = config::get(); + let mut c = get_async_redis_conn().await?; + + if conf.monitoring.backend_interfaces_log_max_history == 0 { + return Ok(()); + } + + let key = redis_key("backend_interfaces:stream:request".to_string()); + let b = pl.encode_to_vec(); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.backend_interfaces_log_max_history) + .arg("*") + .arg("request") + .arg(&b) + .query_async(&mut c) + .await?; + + Ok(()) } diff --git a/chirpstack/src/stream/event.rs b/chirpstack/src/stream/event.rs index c495be75..77923f81 100644 --- a/chirpstack/src/stream/event.rs +++ b/chirpstack/src/stream/event.rs @@ -6,62 +6,55 @@ use anyhow::{Context, Result}; use prost::Message; use redis::streams::StreamReadReply; use tokio::sync::mpsc; -use tokio::task; use tracing::{debug, error, trace}; use crate::config; use crate::helpers::errors::PrintFullError; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::{api, integration}; #[allow(clippy::enum_variant_names)] pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<()> { - task::spawn_blocking({ - let typ = typ.to_string(); - let dev_eui = dev_eui.to_string(); - let b = b.to_vec(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - // per device stream - if conf.monitoring.per_device_event_log_max_history > 0 { - let key = redis_key(format!("device:{{{}}}:stream:event", dev_eui)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.per_device_event_log_max_history) - .arg("*") - .arg(&typ) - .arg(&b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - } + // per device stream + if conf.monitoring.per_device_event_log_max_history > 0 { + let key = redis_key(format!("device:{{{}}}:stream:event", dev_eui)); + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.per_device_event_log_max_history) + .arg("*") + .arg(typ) + .arg(b) + .ignore() + .cmd("PEXPIRE") + .arg(&key) + .arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; + } - // global device stream - if conf.monitoring.device_event_log_max_history > 0 { - let key = redis_key("device:stream:event".to_string()); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.device_event_log_max_history) - .arg("*") - .arg(&typ) - .arg(&b) - .query(&mut *c)?; - } + // global device stream + if conf.monitoring.device_event_log_max_history > 0 { + let key = redis_key("device:stream:event".to_string()); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.device_event_log_max_history) + .arg("*") + .arg(typ) + .arg(b) + .query_async(&mut c) + .await?; + } - Ok(()) - } - }) - .await? + Ok(()) } pub async fn get_event_logs( @@ -69,245 +62,240 @@ pub async fn get_event_logs( count: usize, channel: mpsc::Sender, ) -> Result<()> { - task::spawn_blocking({ - let key = key.clone(); - let channel = channel.clone(); + let mut last_id = "0".to_string(); + let mut c = get_async_redis_conn().await?; - move || -> Result<()> { - let mut last_id = "0".to_string(); - let mut c = get_redis_conn()?; + loop { + if channel.is_closed() { + debug!("Channel has been closed, returning"); + return Ok(()); + } - loop { - if channel.is_closed() { - debug!("Channel has been closed, returning"); - return Ok(()); - } + let srr: StreamReadReply = redis::cmd("XREAD") + .arg("COUNT") + .arg(count) + .arg("STREAMS") + .arg(&key) + .arg(&last_id) + .query_async(&mut c) + .await + .context("XREAD event stream")?; - let srr: StreamReadReply = redis::cmd("XREAD") - .arg("COUNT") - .arg(count) - .arg("STREAMS") - .arg(&key) - .arg(&last_id) - .query(&mut *c) - .context("XREAD event stream")?; - - for stream_key in &srr.keys { - for stream_id in &stream_key.ids { - last_id = stream_id.id.clone(); - for (k, v) in &stream_id.map { - let res = || -> Result<()> { - match k.as_ref() { - "up" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("DR".to_string(), pl.dr.to_string()), - ("FPort".to_string(), pl.f_port.to_string()), - ("FCnt".to_string(), pl.f_cnt.to_string()), - ("Data".to_string(), hex::encode(&pl.data)), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; - } - } - "join" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [("DevAddr".to_string(), pl.dev_addr)] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; - } - } - "ack" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::AckEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - channel.blocking_send(pl)?; - } - } - "txack" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - channel.blocking_send(pl)?; - } - } - "status" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Margin".into(), format!("{}", pl.margin)), - ( - "Battery level".into(), - format!("{:.0}%", pl.battery_level), - ), - ( - "Battery level unavailable".into(), - format!("{}", pl.battery_level_unavailable), - ), - ( - "External power source".into(), - format!("{}", pl.external_power_source), - ), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; - } - } - "log" => { - trace!(key = %k, id =%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::LogEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Level".into(), pl.level().into()), - ("Code".into(), pl.code().into()), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; - } - } - "location" => { - trace!(key = %k, id=%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - channel.blocking_send(pl)?; - } - } - "integration" => { - trace!(key = %k, id=%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = - integration::IntegrationEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Integration".into(), pl.integration_name.clone()), - ("Event".into(), pl.event_type.clone()), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; - } - } - _ => { - error!(key = %k, "Unexpected key in in event-log stream"); - } + for stream_key in &srr.keys { + for stream_id in &stream_key.ids { + last_id = stream_id.id.clone(); + for (k, v) in &stream_id.map { + let res = || -> Result<()> { + match k.as_ref() { + "up" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("DR".to_string(), pl.dr.to_string()), + ("FPort".to_string(), pl.f_port.to_string()), + ("FCnt".to_string(), pl.f_cnt.to_string()), + ("Data".to_string(), hex::encode(&pl.data)), + ] + .iter() + .cloned() + .collect(), + }; + channel.blocking_send(pl)?; } + } + "join" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [("DevAddr".to_string(), pl.dev_addr)] + .iter() + .cloned() + .collect(), + }; - Ok(()) - }(); - - if let Err(e) = res { - // Return in case of channel error, in any other case we just log - // the error. - if e.downcast_ref::>().is_some() { - return Err(e); + channel.blocking_send(pl)?; } + } + "ack" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::AckEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; - error!(key = %k, error = %e.full(), "Parsing frame-log error"); + channel.blocking_send(pl)?; + } + } + "txack" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; + + channel.blocking_send(pl)?; + } + } + "status" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Margin".into(), format!("{}", pl.margin)), + ( + "Battery level".into(), + format!("{:.0}%", pl.battery_level), + ), + ( + "Battery level unavailable".into(), + format!("{}", pl.battery_level_unavailable), + ), + ( + "External power source".into(), + format!("{}", pl.external_power_source), + ), + ] + .iter() + .cloned() + .collect(), + }; + + channel.blocking_send(pl)?; + } + } + "log" => { + trace!(key = %k, id =%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::LogEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Level".into(), pl.level().into()), + ("Code".into(), pl.code().into()), + ] + .iter() + .cloned() + .collect(), + }; + + channel.blocking_send(pl)?; + } + } + "location" => { + trace!(key = %k, id=%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = + integration::LocationEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; + + channel.blocking_send(pl)?; + } + } + "integration" => { + trace!(key = %k, id=%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = + integration::IntegrationEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp { + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Integration".into(), pl.integration_name.clone()), + ("Event".into(), pl.event_type.clone()), + ] + .iter() + .cloned() + .collect(), + }; + + channel.blocking_send(pl)?; + } + } + _ => { + error!(key = %k, "Unexpected key in in event-log stream"); } } + + Ok(()) + }(); + + if let Err(e) = res { + // Return in case of channel error, in any other case we just log + // the error. + if e.downcast_ref::>() + .is_some() + { + return Err(e); + } + + error!(key = %k, error = %e.full(), "Parsing frame-log error"); } } - - sleep(Duration::from_secs(1)); } } - }) - .await? + + sleep(Duration::from_secs(1)); + } } diff --git a/chirpstack/src/stream/frame.rs b/chirpstack/src/stream/frame.rs index 46457cfe..d3dd462b 100644 --- a/chirpstack/src/stream/frame.rs +++ b/chirpstack/src/stream/frame.rs @@ -8,79 +8,74 @@ use prost::Message; use redis::streams::StreamReadReply; use serde_json::json; use tokio::sync::mpsc; -use tokio::task; use tracing::{debug, error, trace, warn}; use lrwn::EUI64; use crate::config; use crate::helpers::errors::PrintFullError; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::{api, stream}; pub async fn log_uplink_for_gateways(ufl: &stream::UplinkFrameLog) -> Result<()> { - task::spawn_blocking({ - let ufl = ufl.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - for rx_info in &ufl.rx_info { - let gateway_id = EUI64::from_str(&rx_info.gateway_id)?; + for rx_info in &ufl.rx_info { + let gateway_id = EUI64::from_str(&rx_info.gateway_id)?; - let ufl_copy = stream::UplinkFrameLog { - phy_payload: ufl.phy_payload.clone(), - tx_info: ufl.tx_info.clone(), - rx_info: vec![rx_info.clone()], - m_type: ufl.m_type, - dev_addr: ufl.dev_addr.clone(), - dev_eui: ufl.dev_eui.clone(), - time: ufl.time.clone(), - plaintext_f_opts: ufl.plaintext_f_opts, - plaintext_frm_payload: ufl.plaintext_frm_payload, - }; + let ufl_copy = stream::UplinkFrameLog { + phy_payload: ufl.phy_payload.clone(), + tx_info: ufl.tx_info.clone(), + rx_info: vec![rx_info.clone()], + m_type: ufl.m_type, + dev_addr: ufl.dev_addr.clone(), + dev_eui: ufl.dev_eui.clone(), + time: ufl.time.clone(), + plaintext_f_opts: ufl.plaintext_f_opts, + plaintext_frm_payload: ufl.plaintext_frm_payload, + }; - let b = ufl_copy.encode_to_vec(); + let b = ufl_copy.encode_to_vec(); - // per gateway stream - if conf.monitoring.per_gateway_frame_log_max_history > 0 { - let key = redis_key(format!("gw:{{{}}}:stream:frame", gateway_id)); + // per gateway stream + if conf.monitoring.per_gateway_frame_log_max_history > 0 { + let key = redis_key(format!("gw:{{{}}}:stream:frame", gateway_id)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.per_gateway_frame_log_max_history) - .arg("*") - .arg("up") - .arg(&b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - } - - // global gateway stream - if conf.monitoring.gateway_frame_log_max_history > 0 { - let key = redis_key("gw:stream:frame".to_string()); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.gateway_frame_log_max_history) - .arg("*") - .arg("up") - .arg(&b) - .query(&mut *c)?; - } - } - - Ok(()) + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.per_gateway_frame_log_max_history) + .arg("*") + .arg("up") + .arg(&b) + .ignore() + .cmd("PEXPIRE") + .arg(&key) + .arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; } - }) - .await? + + // global gateway stream + if conf.monitoring.gateway_frame_log_max_history > 0 { + let key = redis_key("gw:stream:frame".to_string()); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.gateway_frame_log_max_history) + .arg("*") + .arg("up") + .arg(&b) + .query_async(&mut c) + .await?; + } + } + + Ok(()) } pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<()> { @@ -88,51 +83,46 @@ pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result< return Err(anyhow!("gateway_id must be set")); } - task::spawn_blocking({ - let dfl = dfl.clone(); + let conf = config::get(); + let mut c = get_async_redis_conn().await?; + let b = dfl.encode_to_vec(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; - let b = dfl.encode_to_vec(); + // per gateway stream + if conf.monitoring.per_gateway_frame_log_max_history > 0 { + let key = redis_key(format!("gw:{{{}}}:stream:frame", dfl.gateway_id)); + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.per_gateway_frame_log_max_history) + .arg("*") + .arg("down") + .arg(&b) + .ignore() + .cmd("PEXPIRE") + .arg(&key) + .arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; + } - // per gateway stream - if conf.monitoring.per_gateway_frame_log_max_history > 0 { - let key = redis_key(format!("gw:{{{}}}:stream:frame", dfl.gateway_id)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.per_gateway_frame_log_max_history) - .arg("*") - .arg("down") - .arg(&b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - } + // global gateway stream + if conf.monitoring.gateway_frame_log_max_history > 0 { + let key = redis_key("gw:stream:frame".to_string()); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.gateway_frame_log_max_history) + .arg("*") + .arg("down") + .arg(&b) + .query_async(&mut c) + .await?; + } - // global gateway stream - if conf.monitoring.gateway_frame_log_max_history > 0 { - let key = redis_key("gw:stream:frame".to_string()); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.gateway_frame_log_max_history) - .arg("*") - .arg("down") - .arg(&b) - .query(&mut *c)?; - } - - Ok(()) - } - }) - .await? + Ok(()) } pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> { @@ -140,51 +130,47 @@ pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> { return Err(anyhow!("dev_eui must be set")); } - task::spawn_blocking({ - let ufl = ufl.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; - let b = ufl.encode_to_vec(); + let conf = config::get(); + let mut c = get_async_redis_conn().await?; + let b = ufl.encode_to_vec(); - // per device stream - if conf.monitoring.per_device_frame_log_max_history > 0 { - let key = redis_key(format!("device:{{{}}}:stream:frame", ufl.dev_eui)); + // per device stream + if conf.monitoring.per_device_frame_log_max_history > 0 { + let key = redis_key(format!("device:{{{}}}:stream:frame", ufl.dev_eui)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.per_device_frame_log_max_history) - .arg("*") - .arg("up") - .arg(&b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - } + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.per_device_frame_log_max_history) + .arg("*") + .arg("up") + .arg(&b) + .ignore() + .cmd("PEXPIRE") + .arg(&key) + .arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; + } - // global device stream - if conf.monitoring.device_frame_log_max_history > 0 { - let key = redis_key("device:stream:frame".to_string()); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.device_frame_log_max_history) - .arg("*") - .arg("up") - .arg(&b) - .query(&mut *c)?; - } + // global device stream + if conf.monitoring.device_frame_log_max_history > 0 { + let key = redis_key("device:stream:frame".to_string()); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.device_frame_log_max_history) + .arg("*") + .arg("up") + .arg(&b) + .query_async(&mut c) + .await?; + } - Ok(()) - } - }) - .await? + Ok(()) } pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<()> { @@ -192,51 +178,47 @@ pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<( return Err(anyhow!("dev_eui must be set")); } - task::spawn_blocking({ - let dfl = dfl.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; - let b = dfl.encode_to_vec(); + let conf = config::get(); + let mut c = get_async_redis_conn().await?; + let b = dfl.encode_to_vec(); - // per device stream - if conf.monitoring.per_device_frame_log_max_history > 0 { - let key = redis_key(format!("device:{{{}}}:stream:frame", dfl.dev_eui)); + // per device stream + if conf.monitoring.per_device_frame_log_max_history > 0 { + let key = redis_key(format!("device:{{{}}}:stream:frame", dfl.dev_eui)); - c.new_pipeline() - .atomic() - .cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.per_device_frame_log_max_history) - .arg("*") - .arg("down") - .arg(&b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize) - .ignore() - .query(&mut c)?; - } + redis::pipe() + .atomic() + .cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.per_device_frame_log_max_history) + .arg("*") + .arg("down") + .arg(&b) + .ignore() + .cmd("PEXPIRE") + .arg(&key) + .arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await?; + } - // global device stream - if conf.monitoring.device_frame_log_max_history > 0 { - let key = redis_key("device:stream:frame".to_string()); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.device_frame_log_max_history) - .arg("*") - .arg("down") - .arg(&b) - .query(&mut *c)?; - } + // global device stream + if conf.monitoring.device_frame_log_max_history > 0 { + let key = redis_key("device:stream:frame".to_string()); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.device_frame_log_max_history) + .arg("*") + .arg("down") + .arg(&b) + .query_async(&mut c) + .await?; + } - Ok(()) - } - }) - .await? + Ok(()) } pub async fn get_frame_logs( @@ -244,143 +226,138 @@ pub async fn get_frame_logs( count: usize, channel: mpsc::Sender, ) -> Result<()> { - task::spawn_blocking({ - let key = key.clone(); - let channel = channel.clone(); + let mut last_id = "0".to_string(); + let mut c = get_async_redis_conn().await?; - move || -> Result<()> { - let mut last_id = "0".to_string(); - let mut c = get_redis_conn()?; + loop { + if channel.is_closed() { + debug!("Channel has been closed, returning"); + return Ok(()); + } - loop { - if channel.is_closed() { - debug!("Channel has been closed, returning"); - return Ok(()); - } + let srr: StreamReadReply = redis::cmd("XREAD") + .arg("COUNT") + .arg(count) + .arg("STREAMS") + .arg(&key) + .arg(&last_id) + .query_async(&mut c) + .await + .context("XREAD frame stream")?; - let srr: StreamReadReply = redis::cmd("XREAD") - .arg("COUNT") - .arg(count) - .arg("STREAMS") - .arg(&key) - .arg(&last_id) - .query(&mut *c) - .context("XREAD frame stream")?; - - for stream_key in &srr.keys { - for stream_id in &stream_key.ids { - last_id = stream_id.id.clone(); - for (k, v) in &stream_id.map { - let res = || -> Result<()> { - match k.as_ref() { - "up" => { - trace!(key = %k, id = %last_id, "Frame-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?; - let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?; - if pl.plaintext_f_opts { - if let Err(e) = phy.decode_f_opts_to_mac_commands() { - warn!(error = %e.full(), "Decode f_opts to mac-commands error"); - } - } - if pl.plaintext_frm_payload { - if let Err(e) = phy.decode_frm_payload() { - warn!(error = %e.full(), "Decode frm_payload error"); - } - } - - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|t| prost_types::Timestamp { - seconds: t.seconds, - nanos: t.nanos, - }), - description: pl.m_type().into(), - body: json!({ - "phy_payload": phy, - "tx_info": pl.tx_info, - "rx_info": pl.rx_info, - }) - .to_string(), - properties: [ - ("DevAddr".to_string(), pl.dev_addr), - ("DevEUI".to_string(), pl.dev_eui), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; + for stream_key in &srr.keys { + for stream_id in &stream_key.ids { + last_id = stream_id.id.clone(); + for (k, v) in &stream_id.map { + let res = || -> Result<()> { + match k.as_ref() { + "up" => { + trace!(key = %k, id = %last_id, "Frame-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?; + let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?; + if pl.plaintext_f_opts { + if let Err(e) = phy.decode_f_opts_to_mac_commands() { + warn!(error = %e.full(), "Decode f_opts to mac-commands error"); } } - "down" => { - trace!(key = %k, id = %last_id, "frame-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?; - let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?; - if pl.plaintext_f_opts { - if let Err(e) = phy.decode_f_opts_to_mac_commands() { - warn!(error = %e.full(), "Decode f_opts to mac-commands error"); - } - } - if pl.plaintext_frm_payload { - if let Err(e) = phy.decode_frm_payload() { - warn!(error = %e.full(), "Decode frm_payload error"); - } - } - - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|t| prost_types::Timestamp { - seconds: t.seconds, - nanos: t.nanos, - }), - description: pl.m_type().into(), - body: json!({ - "phy_payload": phy, - "tx_info": pl.tx_info, - }) - .to_string(), - properties: [ - ("DevAddr".to_string(), pl.dev_addr), - ("DevEUI".to_string(), pl.dev_eui), - ("Gateway ID".to_string(), pl.gateway_id), - ] - .iter() - .cloned() - .collect(), - }; - - channel.blocking_send(pl)?; + if pl.plaintext_frm_payload { + if let Err(e) = phy.decode_frm_payload() { + warn!(error = %e.full(), "Decode frm_payload error"); } } - _ => { - error!(key = %k, "Unexpected key in frame-log stream"); + + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|t| prost_types::Timestamp { + seconds: t.seconds, + nanos: t.nanos, + }), + description: pl.m_type().into(), + body: json!({ + "phy_payload": phy, + "tx_info": pl.tx_info, + "rx_info": pl.rx_info, + }) + .to_string(), + properties: [ + ("DevAddr".to_string(), pl.dev_addr), + ("DevEUI".to_string(), pl.dev_eui), + ] + .iter() + .cloned() + .collect(), + }; + + channel.blocking_send(pl)?; + } + } + "down" => { + trace!(key = %k, id = %last_id, "frame-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?; + let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?; + if pl.plaintext_f_opts { + if let Err(e) = phy.decode_f_opts_to_mac_commands() { + warn!(error = %e.full(), "Decode f_opts to mac-commands error"); + } } + if pl.plaintext_frm_payload { + if let Err(e) = phy.decode_frm_payload() { + warn!(error = %e.full(), "Decode frm_payload error"); + } + } + + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|t| prost_types::Timestamp { + seconds: t.seconds, + nanos: t.nanos, + }), + description: pl.m_type().into(), + body: json!({ + "phy_payload": phy, + "tx_info": pl.tx_info, + }) + .to_string(), + properties: [ + ("DevAddr".to_string(), pl.dev_addr), + ("DevEUI".to_string(), pl.dev_eui), + ("Gateway ID".to_string(), pl.gateway_id), + ] + .iter() + .cloned() + .collect(), + }; + + channel.blocking_send(pl)?; } - - Ok(()) - }(); - - if let Err(e) = res { - // Return in case of channel error, in any other case we just log - // the error. - if e.downcast_ref::>().is_some() { - return Err(e); - } - - error!(key = %k, error = %e.full(), "Parsing frame-log error"); + } + _ => { + error!(key = %k, "Unexpected key in frame-log stream"); } } + + Ok(()) + }(); + + if let Err(e) = res { + // Return in case of channel error, in any other case we just log + // the error. + if e.downcast_ref::>() + .is_some() + { + return Err(e); + } + + error!(key = %k, error = %e.full(), "Parsing frame-log error"); } } - - // If we use xread with block=0, the connection can't be used by other requests. Now we - // check every 1 second if there are new messages, which should be sufficient. - sleep(Duration::from_secs(1)); } - } - }).await? + + // If we use xread with block=0, the connection can't be used by other requests. Now we + // check every 1 second if there are new messages, which should be sufficient. + sleep(Duration::from_secs(1)); + } } diff --git a/chirpstack/src/stream/meta.rs b/chirpstack/src/stream/meta.rs index 05373d7d..847d5f7d 100644 --- a/chirpstack/src/stream/meta.rs +++ b/chirpstack/src/stream/meta.rs @@ -1,60 +1,49 @@ use anyhow::Result; use prost::Message; -use tokio::task; use crate::config; -use crate::storage::{get_redis_conn, redis_key}; +use crate::storage::{get_async_redis_conn, redis_key}; use chirpstack_api::stream; pub async fn log_uplink(up: &stream::UplinkMeta) -> Result<()> { - task::spawn_blocking({ - let up = up.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - if conf.monitoring.meta_log_max_history > 0 { - let key = redis_key("stream:meta".to_string()); - let b = up.encode_to_vec(); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.meta_log_max_history) - .arg("*") - .arg("up") - .arg(&b) - .query(&mut *c)?; - } + if conf.monitoring.meta_log_max_history > 0 { + let key = redis_key("stream:meta".to_string()); + let b = up.encode_to_vec(); + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.meta_log_max_history) + .arg("*") + .arg("up") + .arg(&b) + .query_async(&mut c) + .await?; + } - Ok(()) - } - }) - .await? + Ok(()) } pub async fn log_downlink(down: &stream::DownlinkMeta) -> Result<()> { - task::spawn_blocking({ - let down = down.clone(); - move || -> Result<()> { - let conf = config::get(); - let mut c = get_redis_conn()?; + let conf = config::get(); + let mut c = get_async_redis_conn().await?; - if conf.monitoring.meta_log_max_history > 0 { - let key = redis_key("stream:meta".to_string()); - let b = down.encode_to_vec(); + if conf.monitoring.meta_log_max_history > 0 { + let key = redis_key("stream:meta".to_string()); + let b = down.encode_to_vec(); - redis::cmd("XADD") - .arg(&key) - .arg("MAXLEN") - .arg(conf.monitoring.meta_log_max_history) - .arg("*") - .arg("down") - .arg(&b) - .query(&mut *c)?; - } + redis::cmd("XADD") + .arg(&key) + .arg("MAXLEN") + .arg(conf.monitoring.meta_log_max_history) + .arg("*") + .arg("down") + .arg(&b) + .query_async(&mut c) + .await?; + } - Ok(()) - } - }) - .await? + Ok(()) } diff --git a/chirpstack/src/test/assert.rs b/chirpstack/src/test/assert.rs index e74cc125..ee77f86d 100644 --- a/chirpstack/src/test/assert.rs +++ b/chirpstack/src/test/assert.rs @@ -12,7 +12,7 @@ use crate::gateway::backend::mock as gateway_mock; use crate::integration::mock; use crate::storage::{ device::{self, DeviceClass}, - device_queue, device_session, downlink_frame, get_redis_conn, redis_key, + device_queue, device_session, downlink_frame, get_async_redis_conn, redis_key, }; use chirpstack_api::{gw, integration as integration_pb, internal, stream}; use lrwn::EUI64; @@ -397,7 +397,7 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator { Box::new(move || { let um = um.clone(); Box::pin(async move { - let mut c = get_redis_conn().unwrap(); + let mut c = get_async_redis_conn().await.unwrap(); let key = redis_key("stream:meta".to_string()); let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") @@ -405,7 +405,8 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator { .arg("STREAMS") .arg(&key) .arg("0") - .query(&mut *c) + .query_async(&mut c) + .await .unwrap(); for stream_key in &srr.keys { @@ -433,7 +434,7 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator { Box::new(move || { let uf = uf.clone(); Box::pin(async move { - let mut c = get_redis_conn().unwrap(); + let mut c = get_async_redis_conn().await.unwrap(); let key = redis_key(format!("device:{{{}}}:stream:frame", uf.dev_eui)); let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") @@ -441,7 +442,8 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator { .arg("STREAMS") .arg(&key) .arg("0") - .query(&mut *c) + .query_async(&mut c) + .await .unwrap(); for stream_key in &srr.keys { diff --git a/chirpstack/src/test/class_a_pr_test.rs b/chirpstack/src/test/class_a_pr_test.rs index bbec071f..da04a7f1 100644 --- a/chirpstack/src/test/class_a_pr_test.rs +++ b/chirpstack/src/test/class_a_pr_test.rs @@ -37,8 +37,8 @@ async fn test_fns_uplink() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -168,7 +168,6 @@ async fn test_fns_uplink() { sns_pr_start_req_mock.delete(); joinserver::reset(); - roaming::reset(); } #[tokio::test] @@ -188,8 +187,8 @@ async fn test_sns_uplink() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -436,8 +435,8 @@ async fn test_sns_roaming_not_allowed() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -618,8 +617,8 @@ async fn test_sns_dev_not_found() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let mut dev_addr = lrwn::DevAddr::from_be_bytes([0, 0, 0, 0]); dev_addr.set_dev_addr_prefix(lrwn::NetID::from_str("000505").unwrap().dev_addr_prefix()); diff --git a/chirpstack/src/test/otaa_js_test.rs b/chirpstack/src/test/otaa_js_test.rs index f375a108..f141f6b6 100644 --- a/chirpstack/src/test/otaa_js_test.rs +++ b/chirpstack/src/test/otaa_js_test.rs @@ -368,7 +368,7 @@ async fn run_test(t: &Test) { }]; config::set(conf); region::setup().unwrap(); - joinserver::setup().unwrap(); + joinserver::setup().await.unwrap(); integration::set_mock().await; gateway_backend::set_backend(&"eu868", Box::new(gateway_backend::mock::Backend {})).await; diff --git a/chirpstack/src/test/otaa_pr_test.rs b/chirpstack/src/test/otaa_pr_test.rs index c93eec52..bc8604fb 100644 --- a/chirpstack/src/test/otaa_pr_test.rs +++ b/chirpstack/src/test/otaa_pr_test.rs @@ -46,8 +46,8 @@ async fn test_fns() { }]; config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -247,7 +247,6 @@ async fn test_fns() { .await; joinserver::reset(); - roaming::reset(); } #[tokio::test] @@ -269,8 +268,8 @@ async fn test_sns() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -432,7 +431,6 @@ async fn test_sns() { ); joinserver::reset(); - roaming::reset(); } #[tokio::test] @@ -454,8 +452,8 @@ async fn test_sns_roaming_not_allowed() { }); config::set(conf); - joinserver::setup().unwrap(); - roaming::setup().unwrap(); + joinserver::setup().await.unwrap(); + roaming::setup().await.unwrap(); let t = tenant::create(tenant::Tenant { name: "tenant".into(), @@ -591,5 +589,4 @@ async fn test_sns_roaming_not_allowed() { ); joinserver::reset(); - roaming::reset(); } diff --git a/chirpstack/src/uplink/data_fns.rs b/chirpstack/src/uplink/data_fns.rs index 02ae527c..93188727 100644 --- a/chirpstack/src/uplink/data_fns.rs +++ b/chirpstack/src/uplink/data_fns.rs @@ -125,7 +125,7 @@ impl Data { }; let net_id = NetID::from_slice(&ds.net_id)?; - let client = roaming::get(&net_id)?; + let client = roaming::get(&net_id).await?; let async_receiver = match client.is_async() { false => None, true => Some( @@ -183,7 +183,7 @@ impl Data { pr_req.base.transaction_id = 1234; } - let client = roaming::get(&net_id)?; + let client = roaming::get(&net_id).await?; let async_receiver = match client.is_async() { false => None, true => Some( diff --git a/chirpstack/src/uplink/join_fns.rs b/chirpstack/src/uplink/join_fns.rs index fe40145a..4d361175 100644 --- a/chirpstack/src/uplink/join_fns.rs +++ b/chirpstack/src/uplink/join_fns.rs @@ -40,7 +40,7 @@ impl JoinRequest { ctx.filter_rx_info_by_public_only()?; ctx.get_home_net_id().await?; - ctx.get_client()?; + ctx.get_client().await?; ctx.start_roaming().await?; ctx.save_roaming_session().await?; @@ -94,10 +94,10 @@ impl JoinRequest { Ok(()) } - fn get_client(&mut self) -> Result<()> { + async fn get_client(&mut self) -> Result<()> { let net_id = self.home_net_id.as_ref().unwrap(); trace!(net_id = %net_id, "Getting backend interfaces client"); - self.client = Some(roaming::get(net_id)?); + self.client = Some(roaming::get(net_id).await?); Ok(()) } diff --git a/chirpstack/src/uplink/mod.rs b/chirpstack/src/uplink/mod.rs index e1958985..b37e4a5a 100644 --- a/chirpstack/src/uplink/mod.rs +++ b/chirpstack/src/uplink/mod.rs @@ -10,7 +10,6 @@ use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prost::Message; -use tokio::task; use tokio::time::sleep; use tracing::{debug, error, info, span, trace, warn, Instrument, Level}; use uuid::Uuid; @@ -19,7 +18,7 @@ use crate::config; use crate::helpers::errors::PrintFullError; use crate::monitoring::prometheus; use crate::storage::{ - device, device_profile, error::Error as StorageError, gateway, get_redis_conn, redis_key, + device, device_profile, error::Error as StorageError, gateway, get_async_redis_conn, redis_key, }; use crate::stream; use chirpstack_api::{common, gw, internal, stream as stream_pb}; @@ -221,92 +220,76 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { } async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> Result<()> { - task::spawn_blocking({ - let key = key.to_string(); - let event_b = event.encode_to_vec(); - move || -> Result<()> { - let mut c = get_redis_conn()?; + let event_b = event.encode_to_vec(); + let mut c = get_async_redis_conn().await?; - c.new_pipeline() - .atomic() - .cmd("SADD") - .arg(&key) - .arg(event_b) - .ignore() - .cmd("PEXPIRE") - .arg(&key) - .arg(ttl.as_millis() as usize) - .ignore() - .query(&mut c) - .context("Deduplication put")?; + redis::pipe() + .atomic() + .cmd("SADD") + .arg(key) + .arg(event_b) + .ignore() + .cmd("PEXPIRE") + .arg(key) + .arg(ttl.as_millis() as usize) + .ignore() + .query_async(&mut c) + .await + .context("Deduplication put")?; - Ok(()) - } - }) - .await? + Ok(()) } async fn deduplicate_locked(key: &str, ttl: Duration) -> Result { - task::spawn_blocking({ - let key = key.to_string(); - move || -> Result { - let mut c = get_redis_conn()?; + let mut c = get_async_redis_conn().await?; - let set: bool = redis::cmd("SET") - .arg(key) - .arg("lock") - .arg("PX") - .arg(ttl.as_millis() as usize) - .arg("NX") - .query(&mut *c) - .context("Deduplication locked")?; + let set: bool = redis::cmd("SET") + .arg(key) + .arg("lock") + .arg("PX") + .arg(ttl.as_millis() as usize) + .arg("NX") + .query_async(&mut c) + .await + .context("Deduplication locked")?; - Ok(!set) - } - }) - .await? + Ok(!set) } async fn deduplicate_collect(key: &str) -> Result { - task::spawn_blocking({ - let key = key.to_string(); - move || -> Result { - let mut c = get_redis_conn()?; - let items_b: Vec> = redis::cmd("SMEMBERS") - .arg(&key) - .query(&mut *c) - .context("Deduplication collect")?; + let mut c = get_async_redis_conn().await?; + let items_b: Vec> = redis::cmd("SMEMBERS") + .arg(&key) + .query_async(&mut c) + .await + .context("Deduplication collect")?; - if items_b.is_empty() { - return Err(anyhow!("Zero items in collect set")); - } + if items_b.is_empty() { + return Err(anyhow!("Zero items in collect set")); + } - let mut pl = gw::UplinkFrameSet { - ..Default::default() - }; + let mut pl = gw::UplinkFrameSet { + ..Default::default() + }; - for b in items_b { - let event = - gw::UplinkFrame::decode(&mut Cursor::new(b)).context("Decode UplinkFrame")?; + for b in items_b { + let event = gw::UplinkFrame::decode(&mut Cursor::new(b)).context("Decode UplinkFrame")?; - if event.tx_info.is_none() { - warn!("tx_info of uplink event is empty, skipping"); - continue; - } - if event.rx_info.is_none() { - warn!("rx_info of uplink event is empty, skipping"); - continue; - } - - pl.tx_info = event.tx_info; - pl.rx_info.push(event.rx_info.unwrap()); - pl.phy_payload = event.phy_payload; - } - - Ok(pl) + if event.tx_info.is_none() { + warn!("tx_info of uplink event is empty, skipping"); + continue; } - }) - .await? + if event.rx_info.is_none() { + warn!("rx_info of uplink event is empty, skipping"); + continue; + } + + pl.tx_info = event.tx_info; + pl.rx_info.push(event.rx_info.unwrap()); + pl.phy_payload = event.phy_payload; + } + + Ok(pl) } pub async fn handle_uplink(deduplication_id: Uuid, uplink: gw::UplinkFrameSet) -> Result<()> {