Make deduplication_delay configurable. Handle duplicated acks.

This commit is contained in:
Orne Brocaar 2022-07-27 20:19:02 +01:00
parent 35b2f4112d
commit a53a90b646
4 changed files with 56 additions and 22 deletions
chirpstack/src
cmd
config.rs
gateway/backend
uplink

@ -127,6 +127,14 @@ pub fn run() {
# after no activity.
device_session_ttl="{{ network.device_session_ttl }}"
# Time to wait for uplink de-duplication.
#
# This is the time that ChirpStack will wait for other gateways to receive
# the same uplink frame. Please note that this value affects the
# roundtrip time. The total roundtrip time (which includes network latency)
# must be less than the (first) receive-window.
deduplication_delay="{{ network.deduplication_delay }}"
# Mac-commands disabled.
mac_commands_disabled={{ network.mac_commands_disabled }}

@ -151,6 +151,8 @@ pub struct Network {
pub enabled_regions: Vec<String>,
#[serde(with = "humantime_serde")]
pub device_session_ttl: Duration,
#[serde(with = "humantime_serde")]
pub deduplication_delay: Duration,
pub mac_commands_disabled: bool,
pub adr_plugins: Vec<String>,
pub scheduler: Scheduler,
@ -162,6 +164,7 @@ impl Default for Network {
net_id: NetID::from_be_bytes([0x00, 0x00, 0x00]),
enabled_regions: vec!["eu868".into()],
device_session_ttl: Duration::from_secs(60 * 60 * 24 * 31),
deduplication_delay: Duration::from_millis(200),
mac_commands_disabled: false,
adr_plugins: vec![],
scheduler: Default::default(),

@ -1,3 +1,5 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::io::Cursor;
use std::time::Duration;
@ -11,11 +13,13 @@ use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prost::Message;
use serde::Serialize;
use tokio::task;
use tracing::{error, info, trace};
use super::GatewayBackend;
use crate::config::GatewayBackendMqtt;
use crate::monitoring::prometheus;
use crate::storage::{get_redis_conn, redis_key};
use crate::{downlink, uplink};
use lrwn::region::CommonName;
@ -231,14 +235,29 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
let qos = msg.qos();
let b = msg.payload();
info!(
region_name = region_name,
topic = topic,
qos = qos,
"Message received from gateway"
);
let mut hasher = DefaultHasher::new();
hasher.write(&b);
let key = redis_key(format!("gw:mqtt:lock:{:x}", hasher.finish()));
let locked = is_locked(key).await;
let err = || -> Result<()> {
if locked? {
trace!(
region_name = region_name,
topic = topic,
qos = qos,
"Message is already handled by different instance"
);
return Ok(());
}
info!(
region_name = region_name,
topic = topic,
qos = qos,
"Message received from gateway"
);
if topic.ends_with("/up") {
EVENT_COUNTER
.get_or_create(&EventLabels {
@ -321,3 +340,22 @@ fn connection_lost_callback(client: &mqtt::AsyncClient) {
"MQTT connection to broker lost"
);
}
async fn is_locked(key: String) -> Result<bool> {
task::spawn_blocking({
move || -> Result<bool> {
let mut c = get_redis_conn()?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(5000)
.arg("NX")
.query(&mut *c)?;
Ok(!set)
}
})
.await?
}

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::io::Cursor;
use std::str::FromStr;
use std::sync::RwLock;
use std::time::{Duration, SystemTime};
use anyhow::{Context, Result};
@ -29,10 +28,6 @@ pub mod join_fns;
pub mod join_sns;
pub mod stats;
lazy_static! {
static ref DEDUPLICATION_DELAY: RwLock<Duration> = RwLock::new(Duration::from_millis(200));
}
#[derive(Clone)]
pub struct UplinkFrameSet {
pub uplink_set_id: Uuid,
@ -105,16 +100,6 @@ pub struct RoamingMetaData {
pub ul_meta_data: backend::ULMetaData,
}
pub fn get_deduplication_delay() -> Duration {
let dur_r = DEDUPLICATION_DELAY.read().unwrap();
*dur_r
}
pub fn set_deduplication_delay(d: Duration) {
let mut dur_w = DEDUPLICATION_DELAY.write().unwrap();
*dur_w = d;
}
pub async fn deduplicate_uplink(event: gw::UplinkFrame) {
if let Err(e) = _deduplicate_uplink(event).await {
error!(error = %e, "Deduplication error");
@ -131,7 +116,7 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
let key = redis_key(format!("up:collect:{}:{}", tx_info_str, phy_str));
let lock_key = redis_key(format!("up:collect:{}:{}:lock", tx_info_str, phy_str));
let dedup_delay = get_deduplication_delay();
let dedup_delay = config::get().network.deduplication_delay;
let mut dedup_ttl = dedup_delay * 2;
if dedup_ttl < Duration::from_millis(200) {
dedup_ttl = Duration::from_millis(200);