diff --git a/chirpstack/src/uplink/mod.rs b/chirpstack/src/uplink/mod.rs index 01aa39dd..bd84f834 100644 --- a/chirpstack/src/uplink/mod.rs +++ b/chirpstack/src/uplink/mod.rs @@ -169,8 +169,8 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { None => "".to_string(), }; - let key = redis_key(format!("up:collect:{}:{}", tx_info_str, phy_str)); - let lock_key = redis_key(format!("up:collect:{}:{}:lock", tx_info_str, phy_str)); + let key = redis_key(format!("up:collect:{{{}:{}}}", tx_info_str, phy_str)); + let lock_key = redis_key(format!("up:collect:{{{}:{}}}:lock", tx_info_str, phy_str)); let dedup_delay = config::get().network.deduplication_delay; let mut dedup_ttl = dedup_delay * 2; @@ -180,15 +180,10 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { trace!( key = key.as_str(), - "Adding uplink event to deduplication set" + "Adding uplink event to deduplication set and getting lock" ); - deduplicate_put(&key, dedup_ttl, &event).await?; - - trace!( - lock_key = lock_key.as_str(), - "Requesting deduplication lock" - ); - if deduplicate_locked(&lock_key, dedup_ttl).await? { + let locked = deduplicate_put(&key, &lock_key, dedup_ttl, &event).await?; + if locked { trace!( lock_key = lock_key.as_str(), "Deduplication is already locked by an other process" @@ -219,38 +214,36 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { Ok(()) } -async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> Result<()> { +async fn deduplicate_put( + collect_key: &str, + lock_key: &str, + ttl: Duration, + event: &gw::UplinkFrame, +) -> Result { let event_b = event.encode_to_vec(); - redis::pipe() + let (lock_set,): (bool,) = redis::pipe() .atomic() .cmd("SADD") - .arg(key) + .arg(collect_key) .arg(event_b) .ignore() .cmd("PEXPIRE") - .arg(key) + .arg(collect_key) .arg(ttl.as_millis() as usize) .ignore() - .query_async(&mut get_async_redis_conn().await?) - .await - .context("Deduplication put")?; - - Ok(()) -} - -async fn deduplicate_locked(key: &str, ttl: Duration) -> Result { - let set: bool = redis::cmd("SET") - .arg(key) + .cmd("SET") + .arg(lock_key) .arg("lock") .arg("PX") .arg(ttl.as_millis() as usize) .arg("NX") .query_async(&mut get_async_redis_conn().await?) .await - .context("Deduplication locked")?; + .context("Deduplication put and get lock")?; - Ok(!set) + // We get true if we were able to set the lock, thus true == not yet locked. + Ok(!lock_set) } async fn deduplicate_collect(key: &str) -> Result {