From a53a90b64612dbba398e1240bfb9117947fa95c4 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 27 Jul 2022 20:19:02 +0100 Subject: [PATCH] Make deduplication_delay configurable. Handle duplicated acks. --- chirpstack/src/cmd/configfile.rs | 8 +++++ chirpstack/src/config.rs | 3 ++ chirpstack/src/gateway/backend/mqtt.rs | 50 ++++++++++++++++++++++---- chirpstack/src/uplink/mod.rs | 17 +-------- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/chirpstack/src/cmd/configfile.rs b/chirpstack/src/cmd/configfile.rs index f01b08fe..74bdee0a 100644 --- a/chirpstack/src/cmd/configfile.rs +++ b/chirpstack/src/cmd/configfile.rs @@ -127,6 +127,14 @@ pub fn run() { # after no activity. device_session_ttl="{{ network.device_session_ttl }}" + # Time to wait for uplink de-duplication. + # + # This is the time that ChirpStack will wait for other gateways to receive + # the same uplink frame. Please note that this value affects the + # roundtrip time. The total roundtrip time (which includes network latency) + # must be less than the (first) receive-window. + deduplication_delay="{{ network.deduplication_delay }}" + # Mac-commands disabled. mac_commands_disabled={{ network.mac_commands_disabled }} diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 85640beb..7db032f7 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -151,6 +151,8 @@ pub struct Network { pub enabled_regions: Vec, #[serde(with = "humantime_serde")] pub device_session_ttl: Duration, + #[serde(with = "humantime_serde")] + pub deduplication_delay: Duration, pub mac_commands_disabled: bool, pub adr_plugins: Vec, pub scheduler: Scheduler, @@ -162,6 +164,7 @@ impl Default for Network { net_id: NetID::from_be_bytes([0x00, 0x00, 0x00]), enabled_regions: vec!["eu868".into()], device_session_ttl: Duration::from_secs(60 * 60 * 24 * 31), + deduplication_delay: Duration::from_millis(200), mac_commands_disabled: false, adr_plugins: vec![], scheduler: Default::default(), diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index 2d0ff491..8f4186ab 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -1,3 +1,5 @@ +use std::collections::hash_map::DefaultHasher; +use std::hash::Hasher; use std::io::Cursor; use std::time::Duration; @@ -11,11 +13,13 @@ use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prost::Message; use serde::Serialize; +use tokio::task; use tracing::{error, info, trace}; use super::GatewayBackend; use crate::config::GatewayBackendMqtt; use crate::monitoring::prometheus; +use crate::storage::{get_redis_conn, redis_key}; use crate::{downlink, uplink}; use lrwn::region::CommonName; @@ -231,14 +235,29 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg let qos = msg.qos(); let b = msg.payload(); - info!( - region_name = region_name, - topic = topic, - qos = qos, - "Message received from gateway" - ); + let mut hasher = DefaultHasher::new(); + hasher.write(&b); + let key = redis_key(format!("gw:mqtt:lock:{:x}", hasher.finish())); + let locked = is_locked(key).await; let err = || -> Result<()> { + if locked? { + trace!( + region_name = region_name, + topic = topic, + qos = qos, + "Message is already handled by different instance" + ); + return Ok(()); + } + + info!( + region_name = region_name, + topic = topic, + qos = qos, + "Message received from gateway" + ); + if topic.ends_with("/up") { EVENT_COUNTER .get_or_create(&EventLabels { @@ -321,3 +340,22 @@ fn connection_lost_callback(client: &mqtt::AsyncClient) { "MQTT connection to broker lost" ); } + +async fn is_locked(key: String) -> Result { + task::spawn_blocking({ + move || -> Result { + let mut c = get_redis_conn()?; + + let set: bool = redis::cmd("SET") + .arg(key) + .arg("lock") + .arg("PX") + .arg(5000) + .arg("NX") + .query(&mut *c)?; + + Ok(!set) + } + }) + .await? +} diff --git a/chirpstack/src/uplink/mod.rs b/chirpstack/src/uplink/mod.rs index 711ae74b..a3db4e77 100644 --- a/chirpstack/src/uplink/mod.rs +++ b/chirpstack/src/uplink/mod.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::io::Cursor; use std::str::FromStr; -use std::sync::RwLock; use std::time::{Duration, SystemTime}; use anyhow::{Context, Result}; @@ -29,10 +28,6 @@ pub mod join_fns; pub mod join_sns; pub mod stats; -lazy_static! { - static ref DEDUPLICATION_DELAY: RwLock = RwLock::new(Duration::from_millis(200)); -} - #[derive(Clone)] pub struct UplinkFrameSet { pub uplink_set_id: Uuid, @@ -105,16 +100,6 @@ pub struct RoamingMetaData { pub ul_meta_data: backend::ULMetaData, } -pub fn get_deduplication_delay() -> Duration { - let dur_r = DEDUPLICATION_DELAY.read().unwrap(); - *dur_r -} - -pub fn set_deduplication_delay(d: Duration) { - let mut dur_w = DEDUPLICATION_DELAY.write().unwrap(); - *dur_w = d; -} - pub async fn deduplicate_uplink(event: gw::UplinkFrame) { if let Err(e) = _deduplicate_uplink(event).await { error!(error = %e, "Deduplication error"); @@ -131,7 +116,7 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { let key = redis_key(format!("up:collect:{}:{}", tx_info_str, phy_str)); let lock_key = redis_key(format!("up:collect:{}:{}:lock", tx_info_str, phy_str)); - let dedup_delay = get_deduplication_delay(); + let dedup_delay = config::get().network.deduplication_delay; let mut dedup_ttl = dedup_delay * 2; if dedup_ttl < Duration::from_millis(200) { dedup_ttl = Duration::from_millis(200);