From 54f74277abfa3ae0c7116492c6f6273faaaa6edb Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 27 Apr 2022 14:10:53 +0100 Subject: [PATCH] Improve frame-counter validation. --- chirpstack/src/storage/device_session.rs | 32 ++++++++++++++++-- chirpstack/src/uplink/data.rs | 41 ++++++++++++++++++------ 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/chirpstack/src/storage/device_session.rs b/chirpstack/src/storage/device_session.rs index 1007033c..1c60ce09 100644 --- a/chirpstack/src/storage/device_session.rs +++ b/chirpstack/src/storage/device_session.rs @@ -119,7 +119,7 @@ pub async fn delete(dev_eui: &EUI64) -> Result<()> { // Return the device-session matching the given PhyPayload. This will fetch all device-session // associated with the used DevAddr and based on f_cont and mic, decides which one to use. -pub async fn get_for_phypayload( +pub async fn get_for_phypayload_and_incr_f_cnt_up( phy: &PhyPayload, tx_dr: u8, tx_ch: u8, @@ -193,6 +193,34 @@ pub async fn get_for_phypayload( }; if full_f_cnt >= ds.f_cnt_up { + // Make sure that in case of concurrent calls for the same uplink only one will + // pass. Either the concurrent call would read the incremented uplink frame-counter + // or it is unable to aquire the lock. + let mut c = get_redis_conn()?; + let lock_key = redis_key(format!( + "device:{{{}}}:ds:lock:{}", + hex::encode(&ds.dev_eui), + full_f_cnt, + )); + let set: bool = redis::cmd("SET") + .arg(&lock_key) + .arg("lock") + .arg("EX") + .arg(1_usize) + .arg("NX") + .query(&mut *c)?; + + if !set { + return Ok(ValidationStatus::Retransmission(full_f_cnt, ds)); + } + + // We immediately save the device-session to make sure that concurrent calls for + // the same uplink will fail on the frame-counter validation. + let ds_f_cnt_up = ds.f_cnt_up; + ds.f_cnt_up = full_f_cnt + 1; + save(&ds).await?; + ds.f_cnt_up = ds_f_cnt_up; + return Ok(ValidationStatus::Ok(full_f_cnt, ds)); } else if ds.skip_f_cnt_check { // re-transmission or frame-counter reset @@ -542,7 +570,7 @@ pub mod test { ) .unwrap(); - let ds_res = get_for_phypayload(&phy, 0, 0).await; + let ds_res = get_for_phypayload_and_incr_f_cnt_up(&phy, 0, 0).await; if tst.expected_error.is_some() { assert_eq!(true, ds_res.is_err()); assert_eq!( diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index 002c6d76..51ac411e 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -72,12 +72,12 @@ impl Data { ctx.get_device_profile().await?; ctx.get_application().await?; ctx.get_tenant().await?; - ctx.abort_on_device_is_disabled()?; + ctx.abort_on_device_is_disabled().await?; ctx.set_device_info()?; ctx.handle_retransmission_reset().await?; ctx.set_device_lock().await?; ctx.set_scheduler_run_after().await?; - ctx.filter_rx_info_by_tenant()?; + ctx.filter_rx_info_by_tenant().await?; ctx.decrypt_f_opts_mac_commands()?; ctx.decrypt_frm_payload()?; ctx.get_mac_payload()?; @@ -105,7 +105,7 @@ impl Data { trace!("Getting device-session for dev_addr"); if let lrwn::Payload::MACPayload(pl) = &self.uplink_frame_set.phy_payload.payload { - match device_session::get_for_phypayload( + match device_session::get_for_phypayload_and_incr_f_cnt_up( &self.uplink_frame_set.phy_payload, self.uplink_frame_set.dr, self.uplink_frame_set.ch as u8, @@ -202,10 +202,17 @@ impl Data { Ok(()) } - fn abort_on_device_is_disabled(&self) -> Result<(), Error> { + async fn abort_on_device_is_disabled(&self) -> Result<(), Error> { let device = self.device.as_ref().unwrap(); if device.is_disabled { + // Restore the device-session in case the device is disabled. + // This is because during the fcnt validation, we immediately store the + // device-session with incremented fcnt to avoid race conditions. + device_session::save(self.device_session.as_ref().unwrap()) + .await + .context("Savel device-session")?; + info!(dev_eui = %device.dev_eui, "Device is disabled, aborting flow"); return Err(Error::Abort); } @@ -215,13 +222,13 @@ impl Data { async fn handle_retransmission_reset(&self) -> Result<(), Error> { trace!("Handle retransmission and reset"); + let dev = self.device.as_ref().unwrap(); - if (!self.retransmission && !self.reset) || self.device.as_ref().unwrap().skip_fcnt_check { + if (!self.retransmission && !self.reset) || dev.skip_fcnt_check { return Ok(()); } let app = self.application.as_ref().unwrap(); - let dev = self.device.as_ref().unwrap(); let ts: DateTime = helpers::get_rx_timestamp(&self.uplink_frame_set.rx_info_set).into(); @@ -291,14 +298,25 @@ impl Data { Ok(()) } - fn filter_rx_info_by_tenant(&mut self) -> Result<()> { + async fn filter_rx_info_by_tenant(&mut self) -> Result<()> { trace!("Filtering rx_info by tenant_id"); - filter_rx_info_by_tenant_id( + match filter_rx_info_by_tenant_id( &self.application.as_ref().unwrap().tenant_id, &mut self.uplink_frame_set, - )?; - Ok(()) + ) { + Ok(_) => Ok(()), + Err(v) => { + // Restore the device-session in case of an error (no gateways available). + // This is because during the fcnt validation, we immediately store the + // device-session with incremented fcnt to avoid race conditions. + device_session::save(self.device_session.as_ref().unwrap()) + .await + .context("Savel device-session")?; + + Err(v) + } + } } fn decrypt_f_opts_mac_commands(&mut self) -> Result<()> { @@ -604,6 +622,9 @@ impl Data { Ok(()) } + // for "normal" uplinks, this is already set by the get_for_phypayload_and_incr_f_cnt_up + // function, however in case of retransmission or reset (if skip_fcnt_check) this is still + // required. fn sync_uplink_f_cnt(&mut self) -> Result<()> { trace!("Syncing uplink frame-counter"); let mut ds = self.device_session.as_mut().unwrap();