Refactor code to use async redis.

This commit is contained in:
Orne Brocaar 2023-11-28 13:23:15 +00:00
parent 8e2eda3d5b
commit 345d0d8462
35 changed files with 1367 additions and 1660 deletions

63
Cargo.lock generated
View File

@ -758,6 +758,8 @@ dependencies = [
"chirpstack_api",
"chrono",
"clap",
"deadpool-redis",
"deadpool-redis-cluster",
"diesel",
"diesel-async",
"diesel_migrations",
@ -789,7 +791,6 @@ dependencies = [
"prometheus-client",
"prost",
"prost-types",
"r2d2",
"rand",
"rand_core",
"rdkafka",
@ -1191,11 +1192,35 @@ dependencies = [
"tokio",
]
[[package]]
name = "deadpool-redis"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84930e585871d35b8e06d3e03d03e3a8a4c5dc71afa4376c7cd5f9223e1da1ea"
dependencies = [
"deadpool",
"redis",
]
[[package]]
name = "deadpool-redis-cluster"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "857c968579c82072dff24f48969d24fb1daab4970d94bb740d87ebb13bc8c2c4"
dependencies = [
"deadpool",
"redis",
"redis_cluster_async",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]]
name = "der"
@ -3354,17 +3379,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -3461,7 +3475,6 @@ dependencies = [
"log",
"percent-encoding",
"pin-project-lite",
"r2d2",
"rand",
"rustls",
"rustls-native-certs",
@ -3474,6 +3487,21 @@ dependencies = [
"url",
]
[[package]]
name = "redis_cluster_async"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093073cc58cbe376f3308c530edcda1a49ef980de1c32f3fa63622fc5c6f0fb9"
dependencies = [
"crc16",
"futures",
"log",
"pin-project-lite",
"rand",
"redis",
"tokio",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -3832,15 +3860,6 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scoped-futures"
version = "0.1.3"

View File

@ -2,9 +2,7 @@
extern crate anyhow;
use std::fs::File;
use std::future::Future;
use std::io::Read;
use std::pin::Pin;
use std::time::Duration;
use aes_kw::Kek;
@ -13,6 +11,7 @@ use chrono::{DateTime, Utc};
use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE};
use reqwest::{Certificate, Identity};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tracing::{debug, error, info, span, trace, Instrument, Level};
@ -35,14 +34,6 @@ pub trait BasePayloadResultProvider {
fn base_payload(&self) -> &BasePayloadResult;
}
pub type RequestLogFn = Box<
dyn Fn(
stream::BackendInterfacesRequest,
) -> Pin<Box<dyn Future<Output = Result<()>> + Sync + Send>>
+ Sync
+ Send,
>;
pub struct ClientConfig {
pub sender_id: Vec<u8>,
pub receiver_id: Vec<u8>,
@ -63,7 +54,7 @@ pub struct ClientConfig {
pub use_target_role_suffix: bool,
// Request log function.
pub request_log_fn: Option<RequestLogFn>,
pub request_log_sender: Option<Sender<stream::BackendInterfacesRequest>>,
}
impl Default for ClientConfig {
@ -78,7 +69,7 @@ impl Default for ClientConfig {
authorization: None,
async_timeout: Duration::from_secs(0),
use_target_role_suffix: false,
request_log_fn: None,
request_log_sender: None,
}
}
}
@ -345,9 +336,12 @@ impl Client {
be_req_log.request_error = format!("{:#}", e);
}
if let Some(log_fn) = &self.config.request_log_fn {
if let Err(e) = log_fn(be_req_log).await {
error!(error = %e, "Log request error");
if let Some(tx) = &self.config.request_log_sender {
// We use try_send here as we don't want to delay the response in case
// there is no channel capacity. This would also log an error, proving
// feedback that there is a channel capacity issue.
if let Err(e) = tx.try_send(be_req_log) {
error!(error = %e, "Sending request-log to stream error");
}
}
@ -1452,10 +1446,7 @@ pub mod test {
let c = Client::new(ClientConfig {
sender_id: vec![1, 2, 3],
server: server.url("/"),
request_log_fn: Some(Box::new(move |log| {
let tx = tx.clone();
Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) })
})),
request_log_sender: Some(tx),
..Default::default()
})
.unwrap();
@ -1517,10 +1508,7 @@ pub mod test {
let c = Client::new(ClientConfig {
sender_id: vec![1, 2, 3],
server: server.url("/"),
request_log_fn: Some(Box::new(move |log| {
let tx = tx.clone();
Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) })
})),
request_log_sender: Some(tx),
..Default::default()
})
.unwrap();

View File

@ -36,9 +36,10 @@ diesel_migrations = { version = "2.1" }
diesel-async = { version = "0.4", features = ["deadpool", "postgres", "async-connection-wrapper"] }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.10.0"
r2d2 = "0.8"
bigdecimal = "0.4"
redis = { version = "0.23", features = ["r2d2", "cluster", "tls-rustls"] }
redis = { version = "0.23", features = ["cluster", "tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = "0.13"
deadpool-redis-cluster = "0.1"
# Logging
tracing = "0.1"

View File

@ -18,7 +18,7 @@ use crate::backend::{joinserver, keywrap, roaming};
use crate::downlink::data_fns;
use crate::helpers::errors::PrintFullError;
use crate::storage::{
device_session, error::Error as StorageError, get_redis_conn, passive_roaming, redis_key,
device_session, error::Error as StorageError, get_async_redis_conn, passive_roaming, redis_key,
};
use crate::uplink::{
data_sns, error::Error as UplinkError, helpers, join_sns, RoamingMetaData, UplinkFrameSet,
@ -127,7 +127,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
}
};
match roaming::get(&sender_id) {
match roaming::get(&sender_id).await {
Ok(v) => v,
Err(_) => {
warn!("Unknown SenderID");
@ -523,33 +523,26 @@ async fn _handle_xmit_data_req(
}
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<hyper::Body>> {
task::spawn_blocking({
let b = b.to_vec();
let transaction_id = bp.transaction_id;
move || -> Result<()> {
let mut c = get_redis_conn()?;
let key = redis_key(format!("backend:async:{}", transaction_id));
let transaction_id = bp.transaction_id;
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("backend:async:{}", transaction_id));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(1_i64)
.arg("*")
.arg("pl")
.arg(&b)
.ignore()
.cmd("EXPIRE")
.arg(&key)
.arg(30_i64)
.ignore()
.query(&mut c)?;
Ok(())
}
})
.await??;
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(1_i64)
.arg("*")
.arg("pl")
.arg(b)
.ignore()
.cmd("EXPIRE")
.arg(&key)
.arg(30_i64)
.ignore()
.query_async(&mut c)
.await?;
Ok(warp::reply().into_response())
}
@ -560,11 +553,17 @@ pub async fn get_async_receiver(
) -> Result<oneshot::Receiver<Vec<u8>>> {
let (tx, rx) = oneshot::channel();
task::spawn_blocking(move || -> Result<()> {
let mut c = get_redis_conn()?;
task::spawn(async move {
let mut c = match get_async_redis_conn().await {
Ok(v) => v,
Err(e) => {
error!(error = %e, "Get Redis connection error");
return;
}
};
let key = redis_key(format!("backend:async:{}", transaction_id));
let srr: StreamReadReply = redis::cmd("XREAD")
let srr: StreamReadReply = match redis::cmd("XREAD")
.arg("BLOCK")
.arg(timeout.as_millis() as u64)
.arg("COUNT")
@ -572,7 +571,15 @@ pub async fn get_async_receiver(
.arg("STREAMS")
.arg(&key)
.arg("0")
.query(&mut *c)?;
.query_async(&mut c)
.await
{
Ok(v) => v,
Err(e) => {
error!(error = %e, "Read from Redis Stream error");
return;
}
};
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
@ -581,7 +588,7 @@ pub async fn get_async_receiver(
"pl" => {
if let redis::Value::Data(b) = v {
let _ = tx.send(b.to_vec());
return Ok(());
return;
}
}
_ => {
@ -595,8 +602,6 @@ pub async fn get_async_receiver(
}
}
}
Ok(())
});
Ok(rx)

View File

@ -63,12 +63,6 @@ impl ToStatus for uuid::Error {
}
}
impl ToStatus for r2d2::Error {
fn status(&self) -> Status {
Status::new(Code::Internal, format!("{:#}", self))
}
}
impl ToStatus for lrwn::Error {
fn status(&self) -> Status {
Status::new(Code::Internal, format!("{:#}", self))

View File

@ -3,13 +3,12 @@ use std::net::SocketAddr;
use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use tokio::task;
use tracing::info;
use warp::{http::Response, http::StatusCode, Filter};
use crate::config;
use crate::monitoring::prometheus;
use crate::storage::{get_async_db_conn, get_redis_conn};
use crate::storage::{get_async_db_conn, get_async_redis_conn};
pub async fn setup() {
let conf = config::get();
@ -56,12 +55,8 @@ async fn _health_handler() -> Result<()> {
.await
.context("PostgreSQL connection error")?;
task::spawn_blocking(move || -> Result<()> {
let mut r = get_redis_conn()?;
if !r.check_connection() {
return Err(anyhow!("Redis connection error"));
}
Ok(())
})
.await?
let mut r = get_async_redis_conn().await?;
let _: String = redis::cmd("PING").query_async(&mut r).await?;
Ok(())
}

View File

@ -13,13 +13,12 @@ use openidconnect::{
};
use openidconnect::{EmptyAdditionalClaims, UserInfoClaims};
use serde::{Deserialize, Serialize};
use tokio::task;
use tracing::{error, trace};
use warp::{Rejection, Reply};
use crate::config;
use crate::helpers::errors::PrintFullError;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
pub type User = UserInfoClaims<EmptyAdditionalClaims, CoreGenderClaim>;
@ -133,41 +132,30 @@ async fn get_client() -> Result<CoreClient> {
}
async fn store_nonce(state: &CsrfToken, nonce: &Nonce) -> Result<()> {
task::spawn_blocking({
let state = state.clone();
let nonce = nonce.clone();
move || -> Result<()> {
trace!("Storing nonce");
let key = redis_key(format!("auth:oidc:{}", state.secret()));
let mut c = get_redis_conn()?;
trace!("Storing nonce");
let key = redis_key(format!("auth:oidc:{}", state.secret()));
let mut c = get_async_redis_conn().await?;
redis::cmd("PSETEX")
.arg(key)
.arg(Duration::minutes(5).num_milliseconds())
.arg(nonce.secret())
.query(&mut *c)?;
redis::cmd("PSETEX")
.arg(key)
.arg(Duration::minutes(5).num_milliseconds())
.arg(nonce.secret())
.query_async(&mut c)
.await?;
Ok(())
}
})
.await?
Ok(())
}
async fn get_nonce(state: &CsrfToken) -> Result<Nonce> {
task::spawn_blocking({
let state = state.clone();
move || -> Result<Nonce> {
trace!("Getting nonce");
let key = redis_key(format!("auth:oidc:{}", state.secret()));
let mut c = get_redis_conn()?;
trace!("Getting nonce");
let key = redis_key(format!("auth:oidc:{}", state.secret()));
let mut c = get_async_redis_conn().await?;
let v: String = redis::cmd("GET")
.arg(&key)
.query(&mut *c)
.context("Get nonce")?;
let v: String = redis::cmd("GET")
.arg(&key)
.query_async(&mut c)
.await
.context("Get nonce")?;
Ok(Nonce::new(v))
}
})
.await?
Ok(Nonce::new(v))
}

View File

@ -11,7 +11,7 @@ lazy_static! {
static ref CLIENTS: RwLock<Vec<(EUI64Prefix, Arc<Client>)>> = RwLock::new(vec![]);
}
pub fn setup() -> Result<()> {
pub async fn setup() -> Result<()> {
info!("Setting up Join Server clients");
let conf = config::get();
@ -28,9 +28,7 @@ pub fn setup() -> Result<()> {
tls_cert: js.tls_cert.clone(),
tls_key: js.tls_key.clone(),
async_timeout: js.async_timeout,
request_log_fn: Some(Box::new(move |log| {
Box::pin(async move { stream::backend_interfaces::log_request(log).await })
})),
request_log_sender: stream::backend_interfaces::get_log_sender().await,
..Default::default()
})?;

View File

@ -4,9 +4,9 @@ pub mod joinserver;
pub mod keywrap;
pub mod roaming;
pub fn setup() -> Result<()> {
joinserver::setup()?;
roaming::setup()?;
pub async fn setup() -> Result<()> {
joinserver::setup().await?;
roaming::setup().await?;
Ok(())
}

View File

@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::io::Cursor;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use anyhow::Result;
use chrono::{Duration, DurationRound};
use prost::Message;
use tokio::sync::RwLock;
use tracing::{debug, info, span, Level};
use crate::gpstime::ToGpsTime;
@ -18,7 +19,7 @@ lazy_static! {
static ref CLIENTS: RwLock<HashMap<NetID, Arc<Client>>> = RwLock::new(HashMap::new());
}
pub fn setup() -> Result<()> {
pub async fn setup() -> Result<()> {
info!("Setting up roaming clients");
let conf = config::get();
@ -56,26 +57,24 @@ pub fn setup() -> Result<()> {
Some(s.authorization_header.clone())
},
async_timeout: s.async_timeout,
request_log_fn: Some(Box::new(move |log| {
Box::pin(async move { stream::backend_interfaces::log_request(log).await })
})),
request_log_sender: stream::backend_interfaces::get_log_sender().await,
})?;
set(&s.net_id, c);
set(&s.net_id, c).await;
}
Ok(())
}
pub fn set(net_id: &NetID, c: Client) {
let mut clients_w = CLIENTS.write().unwrap();
pub async fn set(net_id: &NetID, c: Client) {
let mut clients_w = CLIENTS.write().await;
clients_w.insert(*net_id, Arc::new(c));
}
pub fn get(net_id: &NetID) -> Result<Arc<Client>> {
let clients_r = CLIENTS.write().unwrap();
pub async fn get(net_id: &NetID) -> Result<Arc<Client>> {
let mut clients_w = CLIENTS.write().await;
if let Some(client) = clients_r.get(net_id) {
if let Some(client) = clients_w.get(net_id) {
return Ok(client.clone());
}
@ -106,12 +105,14 @@ pub fn get(net_id: &NetID) -> Result<Arc<Client>> {
Some(conf.roaming.default.authorization_header.clone())
},
async_timeout: conf.roaming.default.async_timeout,
request_log_fn: Some(Box::new(move |log| {
Box::pin(async move { stream::backend_interfaces::log_request(log).await })
})),
request_log_sender: stream::backend_interfaces::get_log_sender().await,
})?;
return Ok(Arc::new(c));
let c = Arc::new(c);
let c_out = c.clone();
clients_w.insert(*net_id, c);
return Ok(c_out);
}
Err(anyhow!(
@ -330,8 +331,8 @@ pub fn dl_meta_data_to_uplink_rx_info(
}
#[cfg(test)]
pub fn reset() {
let mut clients_w = CLIENTS.write().unwrap();
pub async fn reset() {
let mut clients_w = CLIENTS.write().await;
*clients_w = HashMap::new();
}

View File

@ -13,7 +13,7 @@ pub async fn run() -> Result<()> {
storage::setup().await?;
region::setup()?;
backend::setup()?;
backend::setup().await?;
adr::setup().await?;
integration::setup().await?;
gateway::backend::setup().await?;

View File

@ -1044,7 +1044,7 @@ impl Data {
let roaming_meta = ufs.roaming_meta_data.as_ref().unwrap();
let net_id = NetID::from_slice(&roaming_meta.base_payload.sender_id)?;
let client = roaming::get(&net_id)?;
let client = roaming::get(&net_id).await?;
let mut req = backend::XmitDataReqPayload {
phy_payload: self.downlink_frame.items[0].phy_payload.clone(),

View File

@ -19,13 +19,12 @@ use prost::Message;
use rand::Rng;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::task;
use tracing::{error, info, trace};
use super::GatewayBackend;
use crate::config::GatewayBackendMqtt;
use crate::monitoring::prometheus;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use crate::{downlink, uplink};
use lrwn::region::CommonName;
@ -462,22 +461,18 @@ async fn message_callback(
}
async fn is_locked(key: String) -> Result<bool> {
task::spawn_blocking({
move || -> Result<bool> {
let mut c = get_redis_conn()?;
let mut c = get_async_redis_conn().await?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(5000)
.arg("NX")
.query(&mut *c)?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(5000)
.arg("NX")
.query_async(&mut c)
.await?;
Ok(!set)
}
})
.await?
Ok(!set)
}
fn gateway_is_json(gateway_id: &str) -> bool {

View File

@ -3,75 +3,67 @@ use std::io::Cursor;
use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use prost::Message;
use tokio::task;
use tracing::{info, trace};
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::{gw, internal};
use lrwn::EUI64;
pub async fn get_geoloc_buffer(
dev_eui: &EUI64,
ttl: &Duration,
ttl: Duration,
) -> Result<Vec<Vec<gw::UplinkRxInfo>>> {
if *ttl == Duration::zero() {
if ttl == Duration::zero() {
return Ok(Vec::new());
}
task::spawn_blocking({
let dev_eui = *dev_eui;
let ttl = *ttl;
trace!(dev_eui = %dev_eui, "Getting geolocation buffer");
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
let mut c = get_async_redis_conn().await?;
move || -> Result<Vec<Vec<gw::UplinkRxInfo>>> {
trace!(dev_eui = %dev_eui, "Getting geolocation buffer");
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
let mut c = get_redis_conn()?;
let b: Vec<u8> = redis::cmd("GET")
.arg(key)
.query_async(&mut c)
.await
.context("Get geolocation buffer")?;
if b.is_empty() {
return Ok(Vec::new());
}
let b: Vec<u8> = redis::cmd("GET")
.arg(key)
.query(&mut *c)
.context("Get geolocation buffer")?;
if b.is_empty() {
return Ok(Vec::new());
}
let buffer = internal::LoraCloudGeolocBuffer::decode(&mut Cursor::new(b))
.context("Decode geolocation buffer")?;
let buffer = internal::LoraCloudGeolocBuffer::decode(&mut Cursor::new(b))
.context("Decode geolocation buffer")?;
let mut out: Vec<Vec<gw::UplinkRxInfo>> = Vec::new();
let mut out: Vec<Vec<gw::UplinkRxInfo>> = Vec::new();
for uplink in &buffer.uplinks {
let rx_info: Vec<gw::UplinkRxInfo> = uplink
.rx_info
.iter()
.cloned()
.filter(|rx_info| {
let ts: DateTime<Utc> = match &rx_info.gw_time {
None => {
return false;
}
Some(v) => match v.clone().try_into() {
Ok(v) => v,
Err(_) => {
return false;
}
},
};
for uplink in &buffer.uplinks {
let rx_info: Vec<gw::UplinkRxInfo> = uplink
.rx_info
.iter()
.cloned()
.filter(|rx_info| {
let ts: DateTime<Utc> = match &rx_info.gw_time {
None => {
return false;
}
Some(v) => match v.clone().try_into() {
Ok(v) => v,
Err(_) => {
return false;
}
},
};
// The interval between now and then must be smaller than the TTL
(ts - Utc::now()) < ttl
})
.collect();
// The interval between now and then must be smaller than the TTL
(ts - Utc::now()) < ttl
})
.collect();
if rx_info.len() > 3 {
out.push(rx_info);
}
}
Ok(out)
if rx_info.len() > 3 {
out.push(rx_info);
}
})
.await?
}
Ok(out)
}
pub async fn save_geoloc_buffer(
@ -83,35 +75,27 @@ pub async fn save_geoloc_buffer(
return Ok(());
}
task::spawn_blocking({
let dev_eui = *dev_eui;
let ttl = *ttl;
let items = items.to_vec();
trace!(dev_eui = %dev_eui, "Saving geolocation buffer");
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
let mut c = get_async_redis_conn().await?;
move || -> Result<()> {
trace!(dev_eui = %dev_eui, "Saving geolocation buffer");
let key = redis_key(format!("device:{{{}}}:loracloud:buffer", dev_eui));
let mut c = get_redis_conn()?;
let buffer = internal::LoraCloudGeolocBuffer {
uplinks: items
.iter()
.cloned()
.map(|rx_info| internal::LoraCloudGeolocBufferUplink { rx_info })
.collect(),
};
let b = buffer.encode_to_vec();
let buffer = internal::LoraCloudGeolocBuffer {
uplinks: items
.iter()
.cloned()
.map(|rx_info| internal::LoraCloudGeolocBufferUplink { rx_info })
.collect(),
};
let b = buffer.encode_to_vec();
redis::cmd("PSETEX")
.arg(key)
.arg(ttl.num_milliseconds())
.arg(b)
.query_async(&mut c)
.await?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl.num_milliseconds())
.arg(b)
.query(&mut *c)?;
info!(dev_eui = %dev_eui, "Geolocation buffer saved");
info!(dev_eui = %dev_eui, "Geolocation buffer saved");
Ok(())
}
})
.await?
Ok(())
}

View File

@ -461,7 +461,7 @@ impl Integration {
);
let mut buf = vec![pl.rx_info.clone()];
buf.extend_from_slice(&buffer::get_geoloc_buffer(&dev_eui, &ttl).await?);
buf.extend_from_slice(&buffer::get_geoloc_buffer(&dev_eui, ttl).await?);
buf.truncate(
(self
.config

View File

@ -128,7 +128,7 @@ impl IntegrationTrait for Integration {
#[cfg(test)]
pub mod test {
use super::*;
use crate::storage::get_redis_conn;
use crate::storage::get_async_redis_conn;
use crate::test;
use chirpstack_api::integration;
use redis::streams::StreamReadReply;
@ -150,7 +150,7 @@ pub mod test {
..Default::default()
};
i.uplink_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "up", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "up", &pl.encode_to_vec()).await;
// join
let pl = integration::JoinEvent {
@ -162,7 +162,7 @@ pub mod test {
..Default::default()
};
i.join_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "join", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "join", &pl.encode_to_vec()).await;
// ack
let pl = integration::AckEvent {
@ -174,7 +174,7 @@ pub mod test {
..Default::default()
};
i.ack_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "ack", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "ack", &pl.encode_to_vec()).await;
// txack
let pl = integration::TxAckEvent {
@ -186,7 +186,7 @@ pub mod test {
..Default::default()
};
i.txack_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "txack", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "txack", &pl.encode_to_vec()).await;
// log
let pl = integration::LogEvent {
@ -198,7 +198,7 @@ pub mod test {
..Default::default()
};
i.log_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "log", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "log", &pl.encode_to_vec()).await;
// status
let pl = integration::StatusEvent {
@ -210,7 +210,7 @@ pub mod test {
..Default::default()
};
i.status_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "status", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "status", &pl.encode_to_vec()).await;
// location
let pl = integration::LocationEvent {
@ -222,7 +222,7 @@ pub mod test {
..Default::default()
};
i.location_event(&HashMap::new(), &pl).await.unwrap();
last_id = assert_reply(&last_id, "location", &pl.encode_to_vec());
last_id = assert_reply(&last_id, "location", &pl.encode_to_vec()).await;
// integration
let pl = integration::IntegrationEvent {
@ -234,18 +234,19 @@ pub mod test {
..Default::default()
};
i.integration_event(&HashMap::new(), &pl).await.unwrap();
let _ = assert_reply(&last_id, "integration", &pl.encode_to_vec());
let _ = assert_reply(&last_id, "integration", &pl.encode_to_vec()).await;
}
fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String {
let mut c = get_redis_conn().unwrap();
async fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String {
let mut c = get_async_redis_conn().await.unwrap();
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(1 as usize)
.arg("STREAMS")
.arg("device:stream:event")
.arg(&last_id)
.query(&mut *c)
.query_async(&mut c)
.await
.unwrap();
assert_eq!(1, srr.keys.len());

View File

@ -2,60 +2,46 @@ use std::io::Cursor;
use anyhow::{Context, Result};
use prost::Message;
use tokio::task;
use tracing::info;
use super::{error::Error, get_redis_conn, redis_key};
use super::{error::Error, get_async_redis_conn, redis_key};
use crate::config;
use chirpstack_api::internal;
use lrwn::EUI64;
pub async fn save_rx_info(rx_info: &internal::DeviceGatewayRxInfo) -> Result<()> {
let dev_eui = EUI64::from_slice(&rx_info.dev_eui)?;
task::spawn_blocking({
let rx_info = rx_info.clone();
move || -> Result<()> {
let conf = config::get();
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let b = rx_info.encode_to_vec();
let mut c = get_redis_conn()?;
let conf = config::get();
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let b = rx_info.encode_to_vec();
let mut c = get_async_redis_conn().await?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl)
.arg(b)
.query(&mut *c)?;
Ok(())
}
})
.await??;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl)
.arg(b)
.query_async(&mut c)
.await?;
info!(dev_eui = %dev_eui, "Gateway rx-info saved");
Ok(())
}
pub async fn get_rx_info(dev_eui: &EUI64) -> Result<internal::DeviceGatewayRxInfo, Error> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<internal::DeviceGatewayRxInfo, Error> {
let mut c = get_redis_conn()?;
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("device:{{{}}}:gwrx", dev_eui));
let b: Vec<u8> = redis::cmd("GET")
.arg(key)
.query(&mut *c)
.context("Get rx-info")?;
if b.is_empty() {
return Err(Error::NotFound(dev_eui.to_string()));
}
let b: Vec<u8> = redis::cmd("GET")
.arg(key)
.query_async(&mut c)
.await
.context("Get rx-info")?;
if b.is_empty() {
return Err(Error::NotFound(dev_eui.to_string()));
}
Ok(internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b))
.context("Decode rx-info")?)
}
})
.await?
Ok(internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)).context("Decode rx-info")?)
}
pub async fn get_rx_info_for_dev_euis(
@ -65,34 +51,28 @@ pub async fn get_rx_info_for_dev_euis(
return Ok(Vec::new());
}
task::spawn_blocking({
let dev_euis = dev_euis.to_vec();
move || -> Result<Vec<internal::DeviceGatewayRxInfo>, Error> {
let mut c = get_redis_conn()?;
let mut keys: Vec<String> = Vec::new();
for dev_eui in &dev_euis {
keys.push(redis_key(format!("device:{{{}}}:gwrx", dev_eui)));
}
let mut c = get_async_redis_conn().await?;
let mut keys: Vec<String> = Vec::new();
for dev_eui in dev_euis {
keys.push(redis_key(format!("device:{{{}}}:gwrx", dev_eui)));
}
let bb: Vec<Vec<u8>> = redis::cmd("MGET")
.arg(keys)
.query(&mut *c)
.context("MGET")?;
let mut out: Vec<internal::DeviceGatewayRxInfo> = Vec::new();
for b in bb {
if b.is_empty() {
continue;
}
out.push(
internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b))
.context("Decode rx-info")?,
);
}
Ok(out)
let bb: Vec<Vec<u8>> = redis::cmd("MGET")
.arg(keys)
.query_async(&mut c)
.await
.context("MGET")?;
let mut out: Vec<internal::DeviceGatewayRxInfo> = Vec::new();
for b in bb {
if b.is_empty() {
continue;
}
})
.await?
out.push(
internal::DeviceGatewayRxInfo::decode(&mut Cursor::new(b)).context("Decode rx-info")?,
);
}
Ok(out)
}
#[cfg(test)]

View File

@ -3,11 +3,10 @@ use std::io::Cursor;
use anyhow::{Context, Result};
use prost::Message;
use tokio::task;
use tracing::{error, info, trace};
use super::error::Error;
use super::{get_redis_conn, redis_key};
use super::{get_async_redis_conn, redis_key};
use crate::api::helpers::FromProto;
use crate::config;
use crate::helpers::errors::PrintFullError;
@ -24,96 +23,79 @@ pub async fn save(ds: &internal::DeviceSession) -> Result<()> {
let eui = EUI64::from_slice(&ds.dev_eui)?;
let addr = DevAddr::from_slice(&ds.dev_addr)?;
task::spawn_blocking({
let ds = ds.clone();
move || -> Result<()> {
let conf = config::get();
let addr_key = redis_key(format!("devaddr:{{{}}}", addr));
let ds_key = redis_key(format!("device:{{{}}}:ds", eui));
let b = ds.encode_to_vec();
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let mut c = get_redis_conn()?;
let conf = config::get();
let addr_key = redis_key(format!("devaddr:{{{}}}", addr));
let ds_key = redis_key(format!("device:{{{}}}:ds", eui));
let b = ds.encode_to_vec();
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let mut c = get_async_redis_conn().await?;
// Atomic add and pexpire.
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&addr_key)
.arg(&eui.to_be_bytes())
.ignore()
.cmd("PEXPIRE")
.arg(&addr_key)
.arg(ttl)
.ignore()
.query(&mut c)?;
// Atomic add and pexpire.
redis::pipe()
.atomic()
.cmd("SADD")
.arg(&addr_key)
.arg(&eui.to_be_bytes())
.ignore()
.cmd("PEXPIRE")
.arg(&addr_key)
.arg(ttl)
.ignore()
.query_async(&mut c)
.await?;
// In case there is a pending rejoin session, make sure that the new
// DevAddr also resolves to the device-session.
if let Some(pending_ds) = &ds.pending_rejoin_device_session {
let pending_addr = DevAddr::from_slice(&pending_ds.dev_addr)?;
let pending_addr_key = redis_key(format!("devaddr:{{{}}}", pending_addr));
// In case there is a pending rejoin session, make sure that the new
// DevAddr also resolves to the device-session.
if let Some(pending_ds) = &ds.pending_rejoin_device_session {
let pending_addr = DevAddr::from_slice(&pending_ds.dev_addr)?;
let pending_addr_key = redis_key(format!("devaddr:{{{}}}", pending_addr));
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&pending_addr_key)
.arg(&eui.to_be_bytes())
.ignore()
.cmd("PEXPIRE")
.arg(&pending_addr_key)
.arg(ttl)
.ignore()
.query(&mut c)?;
}
redis::pipe()
.atomic()
.cmd("SADD")
.arg(&pending_addr_key)
.arg(&eui.to_be_bytes())
.ignore()
.cmd("PEXPIRE")
.arg(&pending_addr_key)
.arg(ttl)
.ignore()
.query_async(&mut c)
.await?;
}
redis::cmd("PSETEX")
.arg(ds_key)
.arg(ttl)
.arg(b)
.query(&mut *c)?;
Ok(())
}
})
.await??;
redis::cmd("PSETEX")
.arg(ds_key)
.arg(ttl)
.arg(b)
.query_async(&mut c)
.await?;
info!(dev_eui = %eui, dev_addr = %addr, "Device-session saved");
Ok(())
}
pub async fn get(dev_eui: &EUI64) -> Result<chirpstack_api::internal::DeviceSession, Error> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<chirpstack_api::internal::DeviceSession, Error> {
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
let mut c = get_redis_conn()?;
let v: Vec<u8> = redis::cmd("GET")
.arg(key)
.query(&mut *c)
.context("Get device-session")?;
if v.is_empty() {
return Err(Error::NotFound(dev_eui.to_string()));
}
let ds = chirpstack_api::internal::DeviceSession::decode(&mut Cursor::new(v))
.context("Decode device-session")?;
Ok(ds)
}
})
.await?
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
let mut c = get_async_redis_conn().await?;
let v: Vec<u8> = redis::cmd("GET")
.arg(key)
.query_async(&mut c)
.await
.context("Get device-session")?;
if v.is_empty() {
return Err(Error::NotFound(dev_eui.to_string()));
}
let ds = chirpstack_api::internal::DeviceSession::decode(&mut Cursor::new(v))
.context("Decode device-session")?;
Ok(ds)
}
pub async fn delete(dev_eui: &EUI64) -> Result<()> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<()> {
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
let mut c = get_redis_conn()?;
redis::cmd("DEL").arg(&key).query(&mut *c)?;
let key = redis_key(format!("device:{{{}}}:ds", dev_eui));
let mut c = get_async_redis_conn().await?;
redis::cmd("DEL").arg(&key).query_async(&mut c).await?;
Ok(())
}
})
.await??;
info!(dev_eui = %dev_eui, "Device-session deleted");
Ok(())
}
@ -204,7 +186,7 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up(
// Make sure that in case of concurrent calls for the same uplink only one will
// pass. Either the concurrent call would read the incremented uplink frame-counter
// or it is unable to aquire the lock.
let mut c = get_redis_conn()?;
let mut c = get_async_redis_conn().await?;
let lock_key = redis_key(format!(
"device:{{{}}}:ds:lock:{}",
hex::encode(&ds.dev_eui),
@ -216,7 +198,8 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up(
.arg("EX")
.arg(1_usize)
.arg("NX")
.query(&mut *c)?;
.query_async(&mut c)
.await?;
if !set {
return Ok(ValidationStatus::Retransmission(full_f_cnt, ds));
@ -311,40 +294,31 @@ pub async fn get_for_phypayload(
}
async fn get_dev_euis_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<EUI64>> {
task::spawn_blocking({
move || -> Result<Vec<EUI64>> {
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
let mut c = get_redis_conn()?;
let dev_euis: HashSet<Vec<u8>> = redis::cmd("SMEMBERS")
.arg(key)
.query(&mut *c)
.context("Get DevEUIs for DevAddr")?;
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
let mut c = get_async_redis_conn().await?;
let dev_euis: HashSet<Vec<u8>> = redis::cmd("SMEMBERS")
.arg(key)
.query_async(&mut c)
.await
.context("Get DevEUIs for DevAddr")?;
let mut out = Vec::new();
for dev_eui in &dev_euis {
out.push(EUI64::from_slice(dev_eui)?);
}
Ok(out)
}
})
.await?
let mut out = Vec::new();
for dev_eui in &dev_euis {
out.push(EUI64::from_slice(dev_eui)?);
}
Ok(out)
}
async fn remove_dev_eui_from_dev_addr_set(dev_addr: DevAddr, dev_eui: EUI64) -> Result<()> {
task::spawn_blocking({
move || -> Result<()> {
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
let mut c = get_redis_conn()?;
redis::cmd("SREM")
.arg(key)
.arg(&dev_eui.to_be_bytes())
.query(&mut *c)?;
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
let mut c = get_async_redis_conn().await?;
redis::cmd("SREM")
.arg(key)
.arg(&dev_eui.to_be_bytes())
.query_async(&mut c)
.await?;
Ok(())
}
})
.await?
Ok(())
}
async fn get_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<internal::DeviceSession>> {

View File

@ -2,42 +2,35 @@ use std::io::Cursor;
use anyhow::Result;
use prost::Message;
use tokio::task;
use tracing::info;
use super::{error::Error, get_redis_conn, redis_key};
use super::{error::Error, get_async_redis_conn, redis_key};
use chirpstack_api::internal;
pub async fn save(df: &internal::DownlinkFrame) -> Result<()> {
task::spawn_blocking({
let df = df.clone();
move || -> Result<()> {
let b = df.encode_to_vec();
let key = redis_key(format!("frame:{}", df.downlink_id));
let mut c = get_redis_conn()?;
redis::cmd("SETEX").arg(key).arg(30).arg(b).query(&mut *c)?;
Ok(())
}
})
.await??;
let b = df.encode_to_vec();
let key = redis_key(format!("frame:{}", df.downlink_id));
let mut c = get_async_redis_conn().await?;
redis::cmd("SETEX")
.arg(key)
.arg(30)
.arg(b)
.query_async(&mut c)
.await?;
info!(downlink_id = df.downlink_id, "Downlink-frame saved");
Ok(())
}
pub async fn get(id: u32) -> Result<internal::DownlinkFrame, Error> {
task::spawn_blocking({
move || -> Result<internal::DownlinkFrame, Error> {
let mut c = get_redis_conn()?;
let key = redis_key(format!("frame:{}", id));
let v: Vec<u8> = redis::cmd("GET").arg(key).query(&mut *c)?;
if v.is_empty() {
return Err(Error::NotFound(format!("{}", id)));
}
let df = internal::DownlinkFrame::decode(&mut Cursor::new(v))?;
Ok(df)
}
})
.await?
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("frame:{}", id));
let v: Vec<u8> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
if v.is_empty() {
return Err(Error::NotFound(format!("{}", id)));
}
let df = internal::DownlinkFrame::decode(&mut Cursor::new(v))?;
Ok(df)
}
#[cfg(test)]

View File

@ -1,74 +1,55 @@
use anyhow::Result;
use tokio::task;
use tracing::info;
use super::{get_redis_conn, redis_key};
use super::{get_async_redis_conn, redis_key};
use crate::config;
use lrwn::EUI64;
pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommandSet) -> Result<()> {
task::spawn_blocking({
let dev_eui = *dev_eui;
let set = set.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let conf = config::get();
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let b = set.to_vec()?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let b = set.to_vec()?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl)
.arg(b)
.query_async(&mut c)
.await?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl)
.arg(b)
.query(&mut *c)?;
Ok(())
}
})
.await??;
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block set");
Ok(())
}
pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<Option<lrwn::MACCommandSet>> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<Option<lrwn::MACCommandSet>> {
let mut c = get_redis_conn()?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let b: Vec<u8> = redis::cmd("GET").arg(key).query(&mut *c)?;
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let b: Vec<u8> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
let out = if !b.is_empty() {
let mut mac = lrwn::MACCommandSet::from_slice(&b);
let out = if !b.is_empty() {
let mut mac = lrwn::MACCommandSet::from_slice(&b);
// Per definition, the uplink flag is set to false as this function is intended to retrieve
// pending mac-commands that were previously sent to the device.
mac.decode_from_raw(false)?;
// Per definition, the uplink flag is set to false as this function is intended to retrieve
// pending mac-commands that were previously sent to the device.
mac.decode_from_raw(false)?;
Some(mac)
} else {
None
};
Some(mac)
} else {
None
};
Ok(out)
}
})
.await?
Ok(out)
}
pub async fn delete_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<()> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<()> {
let mut c = get_redis_conn()?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let mut c = get_async_redis_conn().await?;
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
redis::cmd("DEL").arg(key).query_async(&mut c).await?;
redis::cmd("DEL").arg(key).query(&mut *c)?;
Ok(())
}
})
.await??;
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block deleted");
Ok(())
}

View File

@ -5,10 +5,9 @@ use std::time::Duration;
use anyhow::Result;
use chrono::{DateTime, Datelike, Duration as ChronoDuration, Local, TimeZone, Timelike};
use serde::{Deserialize, Serialize};
use tokio::task;
use tracing::info;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
#[allow(clippy::upper_case_acronyms)]
#[allow(non_camel_case_types)]
@ -69,23 +68,16 @@ fn get_key(name: &str, a: Aggregation, dt: DateTime<Local>) -> String {
}
pub async fn save_state(name: &str, state: &str) -> Result<()> {
task::spawn_blocking({
let key = redis_key(format!("metrics:{{{}}}", name));
let state = state.to_string();
let ttl = get_ttl(Aggregation::MONTH);
let key = redis_key(format!("metrics:{{{}}}", name));
let ttl = get_ttl(Aggregation::MONTH);
move || -> Result<()> {
let mut c = get_redis_conn()?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl.as_millis() as usize)
.arg(state)
.query(&mut *c)?;
Ok(())
}
})
.await??;
let mut c = get_async_redis_conn().await?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl.as_millis() as usize)
.arg(state)
.query_async(&mut c)
.await?;
info!(state = %state, "State saved");
Ok(())
@ -104,90 +96,78 @@ async fn save_for_interval(a: Aggregation, name: &str, record: &Record) -> Resul
return Ok(());
}
task::spawn_blocking({
let name = name.to_string();
let record = record.clone();
let ttl = get_ttl(a);
let ttl = get_ttl(a);
let ts: DateTime<Local> = match a {
Aggregation::HOUR => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
record.time.hour(),
0,
0,
)
.unwrap(),
Aggregation::DAY => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
0,
0,
0,
)
.unwrap(),
Aggregation::MONTH => Local
.with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0)
.unwrap(),
};
let ts: DateTime<Local> = match a {
Aggregation::HOUR => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
record.time.hour(),
0,
0,
)
.unwrap(),
Aggregation::DAY => Local
.with_ymd_and_hms(
record.time.year(),
record.time.month(),
record.time.day(),
0,
0,
0,
)
.unwrap(),
Aggregation::MONTH => Local
.with_ymd_and_hms(record.time.year(), record.time.month(), 1, 0, 0, 0)
.unwrap(),
};
move || -> Result<()> {
let mut c = get_redis_conn()?;
let key = get_key(&name, a, ts);
let mut pipe = c.new_pipeline();
pipe.atomic();
let mut c = get_async_redis_conn().await?;
let key = get_key(&name, a, ts);
let mut pipe = redis::pipe();
pipe.atomic();
for (k, v) in &record.metrics {
// Passing a reference to hincr will return a runtime error.
let k = k.clone();
let v = *v;
for (k, v) in &record.metrics {
// Passing a reference to hincr will return a runtime error.
let k = k.clone();
let v = *v;
match record.kind {
Kind::COUNTER => {
pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore();
}
Kind::ABSOLUTE => {
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
Kind::GAUGE => {
pipe.cmd("HINCRBYFLOAT")
.arg(&key)
.arg(format!("_{}_count", k))
.arg(1.0)
.ignore();
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
}
match record.kind {
Kind::COUNTER => {
pipe.cmd("HSET").arg(&key).arg(k).arg(v).ignore();
}
Kind::ABSOLUTE => {
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
Kind::GAUGE => {
pipe.cmd("HINCRBYFLOAT")
.arg(&key)
.arg(format!("_{}_count", k))
.arg(1.0)
.ignore();
pipe.cmd("HINCRBYFLOAT").arg(&key).arg(k).arg(v).ignore();
}
pipe.cmd("PEXPIRE")
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
Ok(())
}
})
.await??;
}
pipe.cmd("PEXPIRE")
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
info!(name = %name, aggregation = %a, "Metrics saved");
Ok(())
}
pub async fn get_state(name: &str) -> Result<String> {
task::spawn_blocking({
let key = redis_key(format!("metrics:{{{}}}", name));
move || -> Result<String> {
let mut c = get_redis_conn()?;
let v: Option<String> = redis::cmd("GET").arg(key).query(&mut *c)?;
Ok(v.unwrap_or_default())
}
})
.await?
let key = redis_key(format!("metrics:{{{}}}", name));
let mut c = get_async_redis_conn().await?;
let v: Option<String> = redis::cmd("GET").arg(key).query_async(&mut c).await?;
Ok(v.unwrap_or_default())
}
pub async fn get(
@ -278,54 +258,48 @@ pub async fn get(
return Ok(Vec::new());
}
task::spawn_blocking({
let keys = keys.clone();
move || -> Result<Vec<Record>> {
let mut c = get_redis_conn()?;
let mut pipe = c.new_pipeline();
let mut c = get_async_redis_conn().await?;
let mut pipe = redis::pipe();
for k in &keys {
pipe.cmd("HGETALL").arg(k);
}
for k in &keys {
pipe.cmd("HGETALL").arg(k);
}
let res: Vec<HashMap<String, f64>> = pipe.query(&mut c)?;
let mut out: Vec<Record> = Vec::new();
let res: Vec<HashMap<String, f64>> = pipe.query_async(&mut c).await?;
let mut out: Vec<Record> = Vec::new();
for (i, r) in res.iter().enumerate() {
let mut metrics = r.clone();
for (i, r) in res.iter().enumerate() {
let mut metrics = r.clone();
// In case of GAUGE values, the total aggregated value must be divided by the
// number of measurements.
if kind == Kind::GAUGE {
let counts: HashMap<String, f64> = r
.iter()
.filter(|(k, _)| k.starts_with('_') && k.ends_with("_count"))
.map(|(k, v)| (k.to_string(), *v))
.collect();
// In case of GAUGE values, the total aggregated value must be divided by the
// number of measurements.
if kind == Kind::GAUGE {
let counts: HashMap<String, f64> = r
.iter()
.filter(|(k, _)| k.starts_with('_') && k.ends_with("_count"))
.map(|(k, v)| (k.to_string(), *v))
.collect();
for (k, count) in counts {
let k = k.strip_prefix('_').unwrap().strip_suffix("_count").unwrap();
if let Some(v) = metrics.get_mut(k) {
*v /= count;
}
}
for (k, count) in counts {
let k = k.strip_prefix('_').unwrap().strip_suffix("_count").unwrap();
if let Some(v) = metrics.get_mut(k) {
*v /= count;
}
out.push(Record {
time: timestamps[i],
kind,
metrics: metrics
.iter()
.filter(|(k, _)| !k.starts_with('_'))
.map(|(k, v)| (k.to_string(), *v))
.collect(),
});
}
Ok(out)
}
})
.await?
out.push(Record {
time: timestamps[i],
kind,
metrics: metrics
.iter()
.filter(|(k, _)| !k.starts_with('_'))
.map(|(k, v)| (k.to_string(), *v))
.collect(),
});
}
Ok(out)
}
#[cfg(test)]

View File

@ -1,6 +1,5 @@
use std::fs::File;
use std::io::BufReader;
use std::ops::{Deref, DerefMut};
use std::sync::RwLock;
use anyhow::Context;
@ -13,7 +12,8 @@ use diesel_async::AsyncPgConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use r2d2::{Pool, PooledConnection};
use redis::aio::ConnectionLike;
use tokio::sync::RwLock as TokioRwLock;
use tokio::task;
use tracing::{error, info};
@ -48,129 +48,48 @@ pub type AsyncPgPoolConnection = DeadpoolObject<AsyncPgConnection>;
lazy_static! {
static ref ASYNC_PG_POOL: RwLock<Option<AsyncPgPool>> = RwLock::new(None);
static ref REDIS_POOL: RwLock<Option<RedisPool>> = RwLock::new(None);
static ref ASYNC_REDIS_POOL: TokioRwLock<Option<AsyncRedisPool>> = TokioRwLock::new(None);
static ref REDIS_PREFIX: RwLock<String> = RwLock::new("".to_string());
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
pub enum RedisPool {
Client(Pool<redis::Client>),
ClusterClient(Pool<redis::cluster::ClusterClient>),
#[derive(Clone)]
pub enum AsyncRedisPool {
Client(deadpool_redis::Pool),
ClusterClient(deadpool_redis_cluster::Pool),
}
pub enum RedisPoolConnection {
Client(PooledConnection<redis::Client>),
ClusterClient(PooledConnection<redis::cluster::ClusterClient>),
pub enum AsyncRedisPoolConnection {
Client(deadpool_redis::Connection),
ClusterClient(deadpool_redis_cluster::Connection),
}
impl RedisPoolConnection {
pub fn new_pipeline(&self) -> RedisPipeline {
impl ConnectionLike for AsyncRedisPoolConnection {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
RedisPoolConnection::Client(_) => RedisPipeline::Pipeline(redis::pipe()),
RedisPoolConnection::ClusterClient(_) => {
RedisPipeline::ClusterPipeline(redis::cluster::cluster_pipe())
}
AsyncRedisPoolConnection::Client(v) => v.req_packed_command(cmd),
AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_command(cmd),
}
}
}
impl Deref for RedisPoolConnection {
type Target = dyn redis::ConnectionLike;
fn deref(&self) -> &Self::Target {
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
RedisPoolConnection::Client(v) => v.deref() as &dyn redis::ConnectionLike,
RedisPoolConnection::ClusterClient(v) => v.deref() as &dyn redis::ConnectionLike,
AsyncRedisPoolConnection::Client(v) => v.req_packed_commands(cmd, offset, count),
AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_commands(cmd, offset, count),
}
}
}
impl DerefMut for RedisPoolConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
fn get_db(&self) -> i64 {
match self {
RedisPoolConnection::Client(v) => v.deref_mut() as &mut dyn redis::ConnectionLike,
RedisPoolConnection::ClusterClient(v) => {
v.deref_mut() as &mut dyn redis::ConnectionLike
}
}
}
}
pub enum RedisPipeline {
Pipeline(redis::Pipeline),
ClusterPipeline(redis::cluster::ClusterPipeline),
}
impl RedisPipeline {
pub fn cmd(&mut self, name: &str) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.cmd(name);
}
RedisPipeline::ClusterPipeline(p) => {
p.cmd(name);
}
}
self
}
pub fn arg<T: redis::ToRedisArgs>(&mut self, arg: T) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.arg(arg);
}
RedisPipeline::ClusterPipeline(p) => {
p.arg(arg);
}
}
self
}
pub fn ignore(&mut self) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.ignore();
}
RedisPipeline::ClusterPipeline(p) => {
p.ignore();
}
}
self
}
pub fn atomic(&mut self) -> &mut Self {
match self {
RedisPipeline::Pipeline(p) => {
p.atomic();
}
RedisPipeline::ClusterPipeline(_) => {
// TODO: ClusterPipeline does not (yet?) provide .atomic() method.
// https://github.com/redis-rs/redis-rs/issues/731
}
}
self
}
pub fn query<T: redis::FromRedisValue>(
&mut self,
con: &mut RedisPoolConnection,
) -> redis::RedisResult<T> {
match self {
RedisPipeline::Pipeline(p) => {
if let RedisPoolConnection::Client(c) = con {
p.query(&mut **c)
} else {
panic!("Mismatch between RedisPipeline and RedisPoolConnection")
}
}
RedisPipeline::ClusterPipeline(p) => {
if let RedisPoolConnection::ClusterClient(c) = con {
p.query(c)
} else {
panic!("Mismatch between RedisPipeline and RedisPoolConnection")
}
}
AsyncRedisPoolConnection::Client(v) => v.get_db(),
AsyncRedisPoolConnection::ClusterClient(v) => v.get_db(),
}
}
}
@ -194,29 +113,17 @@ pub async fn setup() -> Result<()> {
info!("Setting up Redis client");
if conf.redis.cluster {
let client = redis::cluster::ClusterClientBuilder::new(conf.redis.servers.clone())
.build()
.context("ClusterClient open")?;
let pool: r2d2::Pool<redis::cluster::ClusterClient> = r2d2::Pool::builder()
.max_size(conf.redis.max_open_connections)
.min_idle(match conf.redis.min_idle_connections {
0 => None,
_ => Some(conf.redis.min_idle_connections),
})
.build(client)
.context("Building Redis pool")?;
set_redis_pool(RedisPool::ClusterClient(pool));
let pool = deadpool_redis_cluster::Config::from_urls(conf.redis.servers.clone())
.builder()?
.max_size(conf.redis.max_open_connections as usize)
.build()?;
set_async_redis_pool(AsyncRedisPool::ClusterClient(pool)).await;
} else {
let client = redis::Client::open(conf.redis.servers[0].clone()).context("Redis client")?;
let pool: r2d2::Pool<redis::Client> = r2d2::Pool::builder()
.max_size(conf.redis.max_open_connections)
.min_idle(match conf.redis.min_idle_connections {
0 => None,
_ => Some(conf.redis.min_idle_connections),
})
.build(client)
.context("Building Redis pool")?;
set_redis_pool(RedisPool::Client(pool));
let pool = deadpool_redis::Config::from_url(conf.redis.servers[0].clone())
.builder()?
.max_size(conf.redis.max_open_connections as usize)
.build()?;
set_async_redis_pool(AsyncRedisPool::Client(pool)).await;
}
if !conf.redis.key_prefix.is_empty() {
@ -289,14 +196,23 @@ pub async fn get_async_db_conn() -> Result<AsyncPgPoolConnection> {
Ok(pool.get().await?)
}
pub fn get_redis_conn() -> Result<RedisPoolConnection> {
let pool_r = REDIS_POOL.read().unwrap();
let pool = pool_r
async fn get_async_redis_pool() -> Result<AsyncRedisPool> {
let pool_r = ASYNC_REDIS_POOL.read().await;
let pool: AsyncRedisPool = pool_r
.as_ref()
.ok_or_else(|| anyhow!("Redis connection pool is not initialized (yet)"))?;
.ok_or_else(|| anyhow!("Redis connection pool is not initialized"))?
.clone();
Ok(pool)
}
pub async fn get_async_redis_conn() -> Result<AsyncRedisPoolConnection> {
let pool = get_async_redis_pool().await?;
Ok(match pool {
RedisPool::Client(v) => RedisPoolConnection::Client(v.get()?),
RedisPool::ClusterClient(v) => RedisPoolConnection::ClusterClient(v.get()?),
AsyncRedisPool::Client(v) => AsyncRedisPoolConnection::Client(v.get().await?),
AsyncRedisPool::ClusterClient(v) => {
AsyncRedisPoolConnection::ClusterClient(v.clone().get().await?)
}
})
}
@ -322,8 +238,8 @@ pub async fn run_db_migrations() -> Result<()> {
.await?
}
pub fn set_redis_pool(p: RedisPool) {
let mut pool_w = REDIS_POOL.write().unwrap();
async fn set_async_redis_pool(p: AsyncRedisPool) {
let mut pool_w = ASYNC_REDIS_POOL.write().await;
*pool_w = Some(p);
}
@ -353,8 +269,8 @@ pub async fn reset_db() -> Result<()> {
#[cfg(test)]
pub async fn reset_redis() -> Result<()> {
let mut c = get_redis_conn()?;
redis::cmd("FLUSHDB").query(&mut *c)?;
let mut c = get_async_redis_conn().await?;
redis::cmd("FLUSHDB").query_async(&mut c).await?;
Ok(())
}

View File

@ -4,12 +4,11 @@ use std::str::FromStr;
use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use prost::Message;
use tokio::task;
use tracing::{debug, info};
use uuid::Uuid;
use super::error::Error;
use super::{get_redis_conn, redis_key};
use super::{get_async_redis_conn, redis_key};
use crate::config;
use chirpstack_api::internal;
use lrwn::{AES128Key, DevAddr, EUI64};
@ -37,59 +36,52 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
return Ok(());
}
task::spawn_blocking({
let ds = ds.clone();
move || -> Result<()> {
let conf = config::get();
let conf = config::get();
let dev_addr_key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr));
let dev_eui_key = redis_key(format!("pr:dev:{{{}}}", dev_eui));
let sess_key = redis_key(format!("pr:sess:{{{}}}", sess_id));
let b = ds.encode_to_vec();
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let pr_ttl = lifetime.num_milliseconds() as usize;
let dev_addr_key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr));
let dev_eui_key = redis_key(format!("pr:dev:{{{}}}", dev_eui));
let sess_key = redis_key(format!("pr:sess:{{{}}}", sess_id));
let b = ds.encode_to_vec();
let ttl = conf.network.device_session_ttl.as_millis() as usize;
let pr_ttl = lifetime.num_milliseconds() as usize;
let mut c = get_redis_conn()?;
let mut c = get_async_redis_conn().await?;
// We need to store a pointer from both the DevAddr and DevEUI to the
// passive-roaming device-session ID. This is needed:
// * Because the DevAddr is not guaranteed to be unique
// * Because the DevEUI might not be given (thus is also not guaranteed
// to be an unique identifier).
//
// But:
// * We need to be able to lookup the session using the DevAddr (potentially
// using the MIC validation).
// * We need to be able to stop a passive-roaming session given a DevEUI.
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&dev_addr_key)
.arg(&sess_id.to_string())
.ignore()
.cmd("SADD")
.arg(&dev_eui_key)
.arg(&sess_id.to_string())
.ignore()
.cmd("PEXPIRE")
.arg(&dev_addr_key)
.arg(ttl)
.ignore()
.cmd("PEXPIRE")
.arg(&dev_eui_key)
.arg(ttl)
.ignore()
.cmd("PSETEX")
.arg(&sess_key)
.arg(pr_ttl)
.arg(b)
.ignore()
.query(&mut c)?;
Ok(())
}
})
.await??;
// We need to store a pointer from both the DevAddr and DevEUI to the
// passive-roaming device-session ID. This is needed:
// * Because the DevAddr is not guaranteed to be unique
// * Because the DevEUI might not be given (thus is also not guaranteed
// to be an unique identifier).
//
// But:
// * We need to be able to lookup the session using the DevAddr (potentially
// using the MIC validation).
// * We need to be able to stop a passive-roaming session given a DevEUI.
redis::pipe()
.atomic()
.cmd("SADD")
.arg(&dev_addr_key)
.arg(&sess_id.to_string())
.ignore()
.cmd("SADD")
.arg(&dev_eui_key)
.arg(&sess_id.to_string())
.ignore()
.cmd("PEXPIRE")
.arg(&dev_addr_key)
.arg(ttl)
.ignore()
.cmd("PEXPIRE")
.arg(&dev_eui_key)
.arg(ttl)
.ignore()
.cmd("PSETEX")
.arg(&sess_key)
.arg(pr_ttl)
.arg(b)
.ignore()
.query_async(&mut c)
.await?;
info!(id = %sess_id, "Passive-roaming device-session saved");
@ -97,35 +89,26 @@ pub async fn save(ds: &internal::PassiveRoamingDeviceSession) -> Result<()> {
}
pub async fn get(id: Uuid) -> Result<internal::PassiveRoamingDeviceSession, Error> {
task::spawn_blocking({
move || -> Result<internal::PassiveRoamingDeviceSession, Error> {
let key = redis_key(format!("pr:sess:{{{}}}", id));
let mut c = get_redis_conn()?;
let v: Vec<u8> = redis::cmd("GET")
.arg(key)
.query(&mut *c)
.context("Get passive-roaming device-session")?;
if v.is_empty() {
return Err(Error::NotFound(id.to_string()));
}
let ds = internal::PassiveRoamingDeviceSession::decode(&mut Cursor::new(v))
.context("Decode passive-roaming device-session")?;
Ok(ds)
}
})
.await?
let key = redis_key(format!("pr:sess:{{{}}}", id));
let mut c = get_async_redis_conn().await?;
let v: Vec<u8> = redis::cmd("GET")
.arg(key)
.query_async(&mut c)
.await
.context("Get passive-roaming device-session")?;
if v.is_empty() {
return Err(Error::NotFound(id.to_string()));
}
let ds = internal::PassiveRoamingDeviceSession::decode(&mut Cursor::new(v))
.context("Decode passive-roaming device-session")?;
Ok(ds)
}
pub async fn delete(id: Uuid) -> Result<()> {
task::spawn_blocking({
move || -> Result<()> {
let key = redis_key(format!("pr:sess:{{{}}}", id));
let mut c = get_redis_conn()?;
redis::cmd("DEL").arg(&key).query(&mut *c)?;
Ok(())
}
})
.await??;
let key = redis_key(format!("pr:sess:{{{}}}", id));
let mut c = get_async_redis_conn().await?;
redis::cmd("DEL").arg(&key).query_async(&mut c).await?;
info!(id = %id, "Passive-roaming device-session deleted");
Ok(())
}
@ -197,39 +180,29 @@ async fn get_sessions_for_dev_addr(
}
async fn get_session_ids_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<Uuid>> {
task::spawn_blocking({
move || -> Result<Vec<Uuid>> {
let key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr));
let mut c = get_redis_conn()?;
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query(&mut *c)?;
let key = redis_key(format!("pr:devaddr:{{{}}}", dev_addr));
let mut c = get_async_redis_conn().await?;
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?;
let mut out: Vec<Uuid> = Vec::new();
for id in &v {
out.push(Uuid::from_str(id)?);
}
let mut out: Vec<Uuid> = Vec::new();
for id in &v {
out.push(Uuid::from_str(id)?);
}
Ok(out)
}
})
.await?
Ok(out)
}
pub async fn get_session_ids_for_dev_eui(dev_eui: EUI64) -> Result<Vec<Uuid>> {
task::spawn_blocking({
move || -> Result<Vec<Uuid>> {
let key = redis_key(format!("pr:dev:{{{}}}", dev_eui));
let mut c = get_redis_conn()?;
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query(&mut *c)?;
let key = redis_key(format!("pr:dev:{{{}}}", dev_eui));
let mut c = get_async_redis_conn().await?;
let v: Vec<String> = redis::cmd("SMEMBERS").arg(key).query_async(&mut c).await?;
let mut out: Vec<Uuid> = Vec::new();
for id in &v {
out.push(Uuid::from_str(id)?);
}
let mut out: Vec<Uuid> = Vec::new();
for id in &v {
out.push(Uuid::from_str(id)?);
}
Ok(out)
}
})
.await?
Ok(out)
}
fn get_full_f_cnt_up(next_expected_full_fcnt: u32, truncated_f_cnt: u32) -> u32 {

View File

@ -1,38 +1,31 @@
use anyhow::Result;
use prost::Message;
use tokio::task;
use crate::config;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::stream;
pub async fn log_request(pl: &stream::ApiRequestLog) -> Result<()> {
task::spawn_blocking({
let pl = pl.clone();
let conf = config::get();
let mut c = get_async_redis_conn().await?;
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
if conf.monitoring.api_request_log_max_history == 0 {
return Ok(());
}
if conf.monitoring.api_request_log_max_history == 0 {
return Ok(());
}
let key = redis_key("api:stream:request".to_string());
let b = pl.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.api_request_log_max_history)
.arg("*")
.arg("request")
.arg(&b)
.query_async(&mut c)
.await?;
let key = redis_key("api:stream:request".to_string());
let b = pl.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.api_request_log_max_history)
.arg("*")
.arg("request")
.arg(&b)
.query(&mut *c)?;
Ok(())
}
})
.await?
Ok(())
}
#[cfg(test)]
@ -56,7 +49,7 @@ mod tests {
};
log_request(&pl).await.unwrap();
let mut c = get_redis_conn().unwrap();
let mut c = get_async_redis_conn().await.unwrap();
let key = redis_key("api:stream:request".to_string());
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
@ -64,7 +57,8 @@ mod tests {
.arg("STREAMS")
.arg(&key)
.arg("0")
.query(&mut *c)
.query_async(&mut c)
.await
.unwrap();
assert_eq!(1, srr.keys.len());

View File

@ -1,34 +1,52 @@
use anyhow::Result;
use prost::Message;
use tokio::task;
use tokio::sync::mpsc::{self, Sender};
use tracing::error;
use crate::config;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::stream;
pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> {
task::spawn_blocking({
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
pub async fn get_log_sender() -> Option<Sender<stream::BackendInterfacesRequest>> {
let conf = config::get();
if conf.monitoring.backend_interfaces_log_max_history == 0 {
return None;
}
if conf.monitoring.backend_interfaces_log_max_history == 0 {
return Ok(());
}
let (tx, mut rx) = mpsc::channel(100);
let key = redis_key("backend_interfaces:stream:request".to_string());
let b = pl.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.backend_interfaces_log_max_history)
.arg("*")
.arg("request")
.arg(&b)
.query(&mut *c)?;
Ok(())
tokio::spawn(async move {
while let Some(pl) = rx.recv().await {
tokio::spawn(async move {
if let Err(e) = log_request(pl).await {
error!(error = %e, "Log request error");
}
});
}
})
.await?
});
Some(tx)
}
pub async fn log_request(pl: stream::BackendInterfacesRequest) -> Result<()> {
let conf = config::get();
let mut c = get_async_redis_conn().await?;
if conf.monitoring.backend_interfaces_log_max_history == 0 {
return Ok(());
}
let key = redis_key("backend_interfaces:stream:request".to_string());
let b = pl.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.backend_interfaces_log_max_history)
.arg("*")
.arg("request")
.arg(&b)
.query_async(&mut c)
.await?;
Ok(())
}

View File

@ -6,62 +6,55 @@ use anyhow::{Context, Result};
use prost::Message;
use redis::streams::StreamReadReply;
use tokio::sync::mpsc;
use tokio::task;
use tracing::{debug, error, trace};
use crate::config;
use crate::helpers::errors::PrintFullError;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::{api, integration};
#[allow(clippy::enum_variant_names)]
pub async fn log_event_for_device(typ: &str, dev_eui: &str, b: &[u8]) -> Result<()> {
task::spawn_blocking({
let typ = typ.to_string();
let dev_eui = dev_eui.to_string();
let b = b.to_vec();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let conf = config::get();
let mut c = get_async_redis_conn().await?;
// per device stream
if conf.monitoring.per_device_event_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:event", dev_eui));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_event_log_max_history)
.arg("*")
.arg(&typ)
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
}
// per device stream
if conf.monitoring.per_device_event_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:event", dev_eui));
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_event_log_max_history)
.arg("*")
.arg(typ)
.arg(b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_event_log_ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
}
// global device stream
if conf.monitoring.device_event_log_max_history > 0 {
let key = redis_key("device:stream:event".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_event_log_max_history)
.arg("*")
.arg(&typ)
.arg(&b)
.query(&mut *c)?;
}
// global device stream
if conf.monitoring.device_event_log_max_history > 0 {
let key = redis_key("device:stream:event".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_event_log_max_history)
.arg("*")
.arg(typ)
.arg(b)
.query_async(&mut c)
.await?;
}
Ok(())
}
})
.await?
Ok(())
}
pub async fn get_event_logs(
@ -69,245 +62,240 @@ pub async fn get_event_logs(
count: usize,
channel: mpsc::Sender<api::LogItem>,
) -> Result<()> {
task::spawn_blocking({
let key = key.clone();
let channel = channel.clone();
let mut last_id = "0".to_string();
let mut c = get_async_redis_conn().await?;
move || -> Result<()> {
let mut last_id = "0".to_string();
let mut c = get_redis_conn()?;
loop {
if channel.is_closed() {
debug!("Channel has been closed, returning");
return Ok(());
}
loop {
if channel.is_closed() {
debug!("Channel has been closed, returning");
return Ok(());
}
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(count)
.arg("STREAMS")
.arg(&key)
.arg(&last_id)
.query_async(&mut c)
.await
.context("XREAD event stream")?;
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(count)
.arg("STREAMS")
.arg(&key)
.arg(&last_id)
.query(&mut *c)
.context("XREAD event stream")?;
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), pl.dr.to_string()),
("FPort".to_string(), pl.f_port.to_string()),
("FCnt".to_string(), pl.f_cnt.to_string()),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"join" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"ack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"txack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"status" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
(
"Battery level".into(),
format!("{:.0}%", pl.battery_level),
),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"log" => {
trace!(key = %k, id =%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"location" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"integration" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
}
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), pl.dr.to_string()),
("FPort".to_string(), pl.f_port.to_string()),
("FCnt".to_string(), pl.f_cnt.to_string()),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"join" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
.iter()
.cloned()
.collect(),
};
Ok(())
}();
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
// the error.
if e.downcast_ref::<mpsc::error::SendError<api::LogItem>>().is_some() {
return Err(e);
channel.blocking_send(pl)?;
}
}
"ack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
error!(key = %k, error = %e.full(), "Parsing frame-log error");
channel.blocking_send(pl)?;
}
}
"txack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"status" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
(
"Battery level".into(),
format!("{:.0}%", pl.battery_level),
),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"log" => {
trace!(key = %k, id =%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"location" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"integration" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
}
}
Ok(())
}();
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
// the error.
if e.downcast_ref::<mpsc::error::SendError<api::LogItem>>()
.is_some()
{
return Err(e);
}
error!(key = %k, error = %e.full(), "Parsing frame-log error");
}
}
sleep(Duration::from_secs(1));
}
}
})
.await?
sleep(Duration::from_secs(1));
}
}

View File

@ -8,79 +8,74 @@ use prost::Message;
use redis::streams::StreamReadReply;
use serde_json::json;
use tokio::sync::mpsc;
use tokio::task;
use tracing::{debug, error, trace, warn};
use lrwn::EUI64;
use crate::config;
use crate::helpers::errors::PrintFullError;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::{api, stream};
pub async fn log_uplink_for_gateways(ufl: &stream::UplinkFrameLog) -> Result<()> {
task::spawn_blocking({
let ufl = ufl.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let conf = config::get();
let mut c = get_async_redis_conn().await?;
for rx_info in &ufl.rx_info {
let gateway_id = EUI64::from_str(&rx_info.gateway_id)?;
for rx_info in &ufl.rx_info {
let gateway_id = EUI64::from_str(&rx_info.gateway_id)?;
let ufl_copy = stream::UplinkFrameLog {
phy_payload: ufl.phy_payload.clone(),
tx_info: ufl.tx_info.clone(),
rx_info: vec![rx_info.clone()],
m_type: ufl.m_type,
dev_addr: ufl.dev_addr.clone(),
dev_eui: ufl.dev_eui.clone(),
time: ufl.time.clone(),
plaintext_f_opts: ufl.plaintext_f_opts,
plaintext_frm_payload: ufl.plaintext_frm_payload,
};
let ufl_copy = stream::UplinkFrameLog {
phy_payload: ufl.phy_payload.clone(),
tx_info: ufl.tx_info.clone(),
rx_info: vec![rx_info.clone()],
m_type: ufl.m_type,
dev_addr: ufl.dev_addr.clone(),
dev_eui: ufl.dev_eui.clone(),
time: ufl.time.clone(),
plaintext_f_opts: ufl.plaintext_f_opts,
plaintext_frm_payload: ufl.plaintext_frm_payload,
};
let b = ufl_copy.encode_to_vec();
let b = ufl_copy.encode_to_vec();
// per gateway stream
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", gateway_id));
// per gateway stream
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", gateway_id));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_gateway_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
}
// global gateway stream
if conf.monitoring.gateway_frame_log_max_history > 0 {
let key = redis_key("gw:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.gateway_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query(&mut *c)?;
}
}
Ok(())
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_gateway_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
}
})
.await?
// global gateway stream
if conf.monitoring.gateway_frame_log_max_history > 0 {
let key = redis_key("gw:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.gateway_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query_async(&mut c)
.await?;
}
}
Ok(())
}
pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<()> {
@ -88,51 +83,46 @@ pub async fn log_downlink_for_gateway(dfl: &stream::DownlinkFrameLog) -> Result<
return Err(anyhow!("gateway_id must be set"));
}
task::spawn_blocking({
let dfl = dfl.clone();
let conf = config::get();
let mut c = get_async_redis_conn().await?;
let b = dfl.encode_to_vec();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let b = dfl.encode_to_vec();
// per gateway stream
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", dfl.gateway_id));
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_gateway_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
}
// per gateway stream
if conf.monitoring.per_gateway_frame_log_max_history > 0 {
let key = redis_key(format!("gw:{{{}}}:stream:frame", dfl.gateway_id));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_gateway_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_gateway_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
}
// global gateway stream
if conf.monitoring.gateway_frame_log_max_history > 0 {
let key = redis_key("gw:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.gateway_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query_async(&mut c)
.await?;
}
// global gateway stream
if conf.monitoring.gateway_frame_log_max_history > 0 {
let key = redis_key("gw:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.gateway_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query(&mut *c)?;
}
Ok(())
}
})
.await?
Ok(())
}
pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> {
@ -140,51 +130,47 @@ pub async fn log_uplink_for_device(ufl: &stream::UplinkFrameLog) -> Result<()> {
return Err(anyhow!("dev_eui must be set"));
}
task::spawn_blocking({
let ufl = ufl.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let b = ufl.encode_to_vec();
let conf = config::get();
let mut c = get_async_redis_conn().await?;
let b = ufl.encode_to_vec();
// per device stream
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", ufl.dev_eui));
// per device stream
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", ufl.dev_eui));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
}
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
}
// global device stream
if conf.monitoring.device_frame_log_max_history > 0 {
let key = redis_key("device:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query(&mut *c)?;
}
// global device stream
if conf.monitoring.device_frame_log_max_history > 0 {
let key = redis_key("device:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_frame_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query_async(&mut c)
.await?;
}
Ok(())
}
})
.await?
Ok(())
}
pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<()> {
@ -192,51 +178,47 @@ pub async fn log_downlink_for_device(dfl: &stream::DownlinkFrameLog) -> Result<(
return Err(anyhow!("dev_eui must be set"));
}
task::spawn_blocking({
let dfl = dfl.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let b = dfl.encode_to_vec();
let conf = config::get();
let mut c = get_async_redis_conn().await?;
let b = dfl.encode_to_vec();
// per device stream
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", dfl.dev_eui));
// per device stream
if conf.monitoring.per_device_frame_log_max_history > 0 {
let key = redis_key(format!("device:{{{}}}:stream:frame", dfl.dev_eui));
c.new_pipeline()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query(&mut c)?;
}
redis::pipe()
.atomic()
.cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.per_device_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(conf.monitoring.per_device_frame_log_ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await?;
}
// global device stream
if conf.monitoring.device_frame_log_max_history > 0 {
let key = redis_key("device:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query(&mut *c)?;
}
// global device stream
if conf.monitoring.device_frame_log_max_history > 0 {
let key = redis_key("device:stream:frame".to_string());
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.device_frame_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query_async(&mut c)
.await?;
}
Ok(())
}
})
.await?
Ok(())
}
pub async fn get_frame_logs(
@ -244,143 +226,138 @@ pub async fn get_frame_logs(
count: usize,
channel: mpsc::Sender<api::LogItem>,
) -> Result<()> {
task::spawn_blocking({
let key = key.clone();
let channel = channel.clone();
let mut last_id = "0".to_string();
let mut c = get_async_redis_conn().await?;
move || -> Result<()> {
let mut last_id = "0".to_string();
let mut c = get_redis_conn()?;
loop {
if channel.is_closed() {
debug!("Channel has been closed, returning");
return Ok(());
}
loop {
if channel.is_closed() {
debug!("Channel has been closed, returning");
return Ok(());
}
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(count)
.arg("STREAMS")
.arg(&key)
.arg(&last_id)
.query_async(&mut c)
.await
.context("XREAD frame stream")?;
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(count)
.arg("STREAMS")
.arg(&key)
.arg(&last_id)
.query(&mut *c)
.context("XREAD frame stream")?;
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
"rx_info": pl.rx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
"down" => {
trace!(key = %k, id = %last_id, "frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
("Gateway ID".to_string(), pl.gateway_id),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
_ => {
error!(key = %k, "Unexpected key in frame-log stream");
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
"rx_info": pl.rx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"down" => {
trace!(key = %k, id = %last_id, "frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
("Gateway ID".to_string(), pl.gateway_id),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
Ok(())
}();
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
// the error.
if e.downcast_ref::<mpsc::error::SendError<api::LogItem>>().is_some() {
return Err(e);
}
error!(key = %k, error = %e.full(), "Parsing frame-log error");
}
_ => {
error!(key = %k, "Unexpected key in frame-log stream");
}
}
Ok(())
}();
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
// the error.
if e.downcast_ref::<mpsc::error::SendError<api::LogItem>>()
.is_some()
{
return Err(e);
}
error!(key = %k, error = %e.full(), "Parsing frame-log error");
}
}
// If we use xread with block=0, the connection can't be used by other requests. Now we
// check every 1 second if there are new messages, which should be sufficient.
sleep(Duration::from_secs(1));
}
}
}).await?
// If we use xread with block=0, the connection can't be used by other requests. Now we
// check every 1 second if there are new messages, which should be sufficient.
sleep(Duration::from_secs(1));
}
}

View File

@ -1,60 +1,49 @@
use anyhow::Result;
use prost::Message;
use tokio::task;
use crate::config;
use crate::storage::{get_redis_conn, redis_key};
use crate::storage::{get_async_redis_conn, redis_key};
use chirpstack_api::stream;
pub async fn log_uplink(up: &stream::UplinkMeta) -> Result<()> {
task::spawn_blocking({
let up = up.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let conf = config::get();
let mut c = get_async_redis_conn().await?;
if conf.monitoring.meta_log_max_history > 0 {
let key = redis_key("stream:meta".to_string());
let b = up.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.meta_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query(&mut *c)?;
}
if conf.monitoring.meta_log_max_history > 0 {
let key = redis_key("stream:meta".to_string());
let b = up.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.meta_log_max_history)
.arg("*")
.arg("up")
.arg(&b)
.query_async(&mut c)
.await?;
}
Ok(())
}
})
.await?
Ok(())
}
pub async fn log_downlink(down: &stream::DownlinkMeta) -> Result<()> {
task::spawn_blocking({
let down = down.clone();
move || -> Result<()> {
let conf = config::get();
let mut c = get_redis_conn()?;
let conf = config::get();
let mut c = get_async_redis_conn().await?;
if conf.monitoring.meta_log_max_history > 0 {
let key = redis_key("stream:meta".to_string());
let b = down.encode_to_vec();
if conf.monitoring.meta_log_max_history > 0 {
let key = redis_key("stream:meta".to_string());
let b = down.encode_to_vec();
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.meta_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query(&mut *c)?;
}
redis::cmd("XADD")
.arg(&key)
.arg("MAXLEN")
.arg(conf.monitoring.meta_log_max_history)
.arg("*")
.arg("down")
.arg(&b)
.query_async(&mut c)
.await?;
}
Ok(())
}
})
.await?
Ok(())
}

View File

@ -12,7 +12,7 @@ use crate::gateway::backend::mock as gateway_mock;
use crate::integration::mock;
use crate::storage::{
device::{self, DeviceClass},
device_queue, device_session, downlink_frame, get_redis_conn, redis_key,
device_queue, device_session, downlink_frame, get_async_redis_conn, redis_key,
};
use chirpstack_api::{gw, integration as integration_pb, internal, stream};
use lrwn::EUI64;
@ -397,7 +397,7 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator {
Box::new(move || {
let um = um.clone();
Box::pin(async move {
let mut c = get_redis_conn().unwrap();
let mut c = get_async_redis_conn().await.unwrap();
let key = redis_key("stream:meta".to_string());
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
@ -405,7 +405,8 @@ pub fn uplink_meta_log(um: stream::UplinkMeta) -> Validator {
.arg("STREAMS")
.arg(&key)
.arg("0")
.query(&mut *c)
.query_async(&mut c)
.await
.unwrap();
for stream_key in &srr.keys {
@ -433,7 +434,7 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator {
Box::new(move || {
let uf = uf.clone();
Box::pin(async move {
let mut c = get_redis_conn().unwrap();
let mut c = get_async_redis_conn().await.unwrap();
let key = redis_key(format!("device:{{{}}}:stream:frame", uf.dev_eui));
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
@ -441,7 +442,8 @@ pub fn device_uplink_frame_log(uf: stream::UplinkFrameLog) -> Validator {
.arg("STREAMS")
.arg(&key)
.arg("0")
.query(&mut *c)
.query_async(&mut c)
.await
.unwrap();
for stream_key in &srr.keys {

View File

@ -37,8 +37,8 @@ async fn test_fns_uplink() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -168,7 +168,6 @@ async fn test_fns_uplink() {
sns_pr_start_req_mock.delete();
joinserver::reset();
roaming::reset();
}
#[tokio::test]
@ -188,8 +187,8 @@ async fn test_sns_uplink() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -436,8 +435,8 @@ async fn test_sns_roaming_not_allowed() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -618,8 +617,8 @@ async fn test_sns_dev_not_found() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let mut dev_addr = lrwn::DevAddr::from_be_bytes([0, 0, 0, 0]);
dev_addr.set_dev_addr_prefix(lrwn::NetID::from_str("000505").unwrap().dev_addr_prefix());

View File

@ -368,7 +368,7 @@ async fn run_test(t: &Test) {
}];
config::set(conf);
region::setup().unwrap();
joinserver::setup().unwrap();
joinserver::setup().await.unwrap();
integration::set_mock().await;
gateway_backend::set_backend(&"eu868", Box::new(gateway_backend::mock::Backend {})).await;

View File

@ -46,8 +46,8 @@ async fn test_fns() {
}];
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -247,7 +247,6 @@ async fn test_fns() {
.await;
joinserver::reset();
roaming::reset();
}
#[tokio::test]
@ -269,8 +268,8 @@ async fn test_sns() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -432,7 +431,6 @@ async fn test_sns() {
);
joinserver::reset();
roaming::reset();
}
#[tokio::test]
@ -454,8 +452,8 @@ async fn test_sns_roaming_not_allowed() {
});
config::set(conf);
joinserver::setup().unwrap();
roaming::setup().unwrap();
joinserver::setup().await.unwrap();
roaming::setup().await.unwrap();
let t = tenant::create(tenant::Tenant {
name: "tenant".into(),
@ -591,5 +589,4 @@ async fn test_sns_roaming_not_allowed() {
);
joinserver::reset();
roaming::reset();
}

View File

@ -125,7 +125,7 @@ impl Data {
};
let net_id = NetID::from_slice(&ds.net_id)?;
let client = roaming::get(&net_id)?;
let client = roaming::get(&net_id).await?;
let async_receiver = match client.is_async() {
false => None,
true => Some(
@ -183,7 +183,7 @@ impl Data {
pr_req.base.transaction_id = 1234;
}
let client = roaming::get(&net_id)?;
let client = roaming::get(&net_id).await?;
let async_receiver = match client.is_async() {
false => None,
true => Some(

View File

@ -40,7 +40,7 @@ impl JoinRequest {
ctx.filter_rx_info_by_public_only()?;
ctx.get_home_net_id().await?;
ctx.get_client()?;
ctx.get_client().await?;
ctx.start_roaming().await?;
ctx.save_roaming_session().await?;
@ -94,10 +94,10 @@ impl JoinRequest {
Ok(())
}
fn get_client(&mut self) -> Result<()> {
async fn get_client(&mut self) -> Result<()> {
let net_id = self.home_net_id.as_ref().unwrap();
trace!(net_id = %net_id, "Getting backend interfaces client");
self.client = Some(roaming::get(net_id)?);
self.client = Some(roaming::get(net_id).await?);
Ok(())
}

View File

@ -10,7 +10,6 @@ use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prost::Message;
use tokio::task;
use tokio::time::sleep;
use tracing::{debug, error, info, span, trace, warn, Instrument, Level};
use uuid::Uuid;
@ -19,7 +18,7 @@ use crate::config;
use crate::helpers::errors::PrintFullError;
use crate::monitoring::prometheus;
use crate::storage::{
device, device_profile, error::Error as StorageError, gateway, get_redis_conn, redis_key,
device, device_profile, error::Error as StorageError, gateway, get_async_redis_conn, redis_key,
};
use crate::stream;
use chirpstack_api::{common, gw, internal, stream as stream_pb};
@ -221,92 +220,76 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
}
async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> Result<()> {
task::spawn_blocking({
let key = key.to_string();
let event_b = event.encode_to_vec();
move || -> Result<()> {
let mut c = get_redis_conn()?;
let event_b = event.encode_to_vec();
let mut c = get_async_redis_conn().await?;
c.new_pipeline()
.atomic()
.cmd("SADD")
.arg(&key)
.arg(event_b)
.ignore()
.cmd("PEXPIRE")
.arg(&key)
.arg(ttl.as_millis() as usize)
.ignore()
.query(&mut c)
.context("Deduplication put")?;
redis::pipe()
.atomic()
.cmd("SADD")
.arg(key)
.arg(event_b)
.ignore()
.cmd("PEXPIRE")
.arg(key)
.arg(ttl.as_millis() as usize)
.ignore()
.query_async(&mut c)
.await
.context("Deduplication put")?;
Ok(())
}
})
.await?
Ok(())
}
async fn deduplicate_locked(key: &str, ttl: Duration) -> Result<bool> {
task::spawn_blocking({
let key = key.to_string();
move || -> Result<bool> {
let mut c = get_redis_conn()?;
let mut c = get_async_redis_conn().await?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(ttl.as_millis() as usize)
.arg("NX")
.query(&mut *c)
.context("Deduplication locked")?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(ttl.as_millis() as usize)
.arg("NX")
.query_async(&mut c)
.await
.context("Deduplication locked")?;
Ok(!set)
}
})
.await?
Ok(!set)
}
async fn deduplicate_collect(key: &str) -> Result<gw::UplinkFrameSet> {
task::spawn_blocking({
let key = key.to_string();
move || -> Result<gw::UplinkFrameSet> {
let mut c = get_redis_conn()?;
let items_b: Vec<Vec<u8>> = redis::cmd("SMEMBERS")
.arg(&key)
.query(&mut *c)
.context("Deduplication collect")?;
let mut c = get_async_redis_conn().await?;
let items_b: Vec<Vec<u8>> = redis::cmd("SMEMBERS")
.arg(&key)
.query_async(&mut c)
.await
.context("Deduplication collect")?;
if items_b.is_empty() {
return Err(anyhow!("Zero items in collect set"));
}
if items_b.is_empty() {
return Err(anyhow!("Zero items in collect set"));
}
let mut pl = gw::UplinkFrameSet {
..Default::default()
};
let mut pl = gw::UplinkFrameSet {
..Default::default()
};
for b in items_b {
let event =
gw::UplinkFrame::decode(&mut Cursor::new(b)).context("Decode UplinkFrame")?;
for b in items_b {
let event = gw::UplinkFrame::decode(&mut Cursor::new(b)).context("Decode UplinkFrame")?;
if event.tx_info.is_none() {
warn!("tx_info of uplink event is empty, skipping");
continue;
}
if event.rx_info.is_none() {
warn!("rx_info of uplink event is empty, skipping");
continue;
}
pl.tx_info = event.tx_info;
pl.rx_info.push(event.rx_info.unwrap());
pl.phy_payload = event.phy_payload;
}
Ok(pl)
if event.tx_info.is_none() {
warn!("tx_info of uplink event is empty, skipping");
continue;
}
})
.await?
if event.rx_info.is_none() {
warn!("rx_info of uplink event is empty, skipping");
continue;
}
pl.tx_info = event.tx_info;
pl.rx_info.push(event.rx_info.unwrap());
pl.phy_payload = event.phy_payload;
}
Ok(pl)
}
pub async fn handle_uplink(deduplication_id: Uuid, uplink: gw::UplinkFrameSet) -> Result<()> {