Improve frame-counter validation.

This commit is contained in:
Orne Brocaar 2022-04-27 14:10:53 +01:00
parent 24b975b337
commit 54f74277ab
2 changed files with 61 additions and 12 deletions

View File

@ -119,7 +119,7 @@ pub async fn delete(dev_eui: &EUI64) -> Result<()> {
// Return the device-session matching the given PhyPayload. This will fetch all device-session
// associated with the used DevAddr and based on f_cont and mic, decides which one to use.
pub async fn get_for_phypayload(
pub async fn get_for_phypayload_and_incr_f_cnt_up(
phy: &PhyPayload,
tx_dr: u8,
tx_ch: u8,
@ -193,6 +193,34 @@ pub async fn get_for_phypayload(
};
if full_f_cnt >= ds.f_cnt_up {
// Make sure that in case of concurrent calls for the same uplink only one will
// pass. Either the concurrent call would read the incremented uplink frame-counter
// or it is unable to aquire the lock.
let mut c = get_redis_conn()?;
let lock_key = redis_key(format!(
"device:{{{}}}:ds:lock:{}",
hex::encode(&ds.dev_eui),
full_f_cnt,
));
let set: bool = redis::cmd("SET")
.arg(&lock_key)
.arg("lock")
.arg("EX")
.arg(1_usize)
.arg("NX")
.query(&mut *c)?;
if !set {
return Ok(ValidationStatus::Retransmission(full_f_cnt, ds));
}
// We immediately save the device-session to make sure that concurrent calls for
// the same uplink will fail on the frame-counter validation.
let ds_f_cnt_up = ds.f_cnt_up;
ds.f_cnt_up = full_f_cnt + 1;
save(&ds).await?;
ds.f_cnt_up = ds_f_cnt_up;
return Ok(ValidationStatus::Ok(full_f_cnt, ds));
} else if ds.skip_f_cnt_check {
// re-transmission or frame-counter reset
@ -542,7 +570,7 @@ pub mod test {
)
.unwrap();
let ds_res = get_for_phypayload(&phy, 0, 0).await;
let ds_res = get_for_phypayload_and_incr_f_cnt_up(&phy, 0, 0).await;
if tst.expected_error.is_some() {
assert_eq!(true, ds_res.is_err());
assert_eq!(

View File

@ -72,12 +72,12 @@ impl Data {
ctx.get_device_profile().await?;
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.abort_on_device_is_disabled()?;
ctx.abort_on_device_is_disabled().await?;
ctx.set_device_info()?;
ctx.handle_retransmission_reset().await?;
ctx.set_device_lock().await?;
ctx.set_scheduler_run_after().await?;
ctx.filter_rx_info_by_tenant()?;
ctx.filter_rx_info_by_tenant().await?;
ctx.decrypt_f_opts_mac_commands()?;
ctx.decrypt_frm_payload()?;
ctx.get_mac_payload()?;
@ -105,7 +105,7 @@ impl Data {
trace!("Getting device-session for dev_addr");
if let lrwn::Payload::MACPayload(pl) = &self.uplink_frame_set.phy_payload.payload {
match device_session::get_for_phypayload(
match device_session::get_for_phypayload_and_incr_f_cnt_up(
&self.uplink_frame_set.phy_payload,
self.uplink_frame_set.dr,
self.uplink_frame_set.ch as u8,
@ -202,10 +202,17 @@ impl Data {
Ok(())
}
fn abort_on_device_is_disabled(&self) -> Result<(), Error> {
async fn abort_on_device_is_disabled(&self) -> Result<(), Error> {
let device = self.device.as_ref().unwrap();
if device.is_disabled {
// Restore the device-session in case the device is disabled.
// This is because during the fcnt validation, we immediately store the
// device-session with incremented fcnt to avoid race conditions.
device_session::save(self.device_session.as_ref().unwrap())
.await
.context("Savel device-session")?;
info!(dev_eui = %device.dev_eui, "Device is disabled, aborting flow");
return Err(Error::Abort);
}
@ -215,13 +222,13 @@ impl Data {
async fn handle_retransmission_reset(&self) -> Result<(), Error> {
trace!("Handle retransmission and reset");
let dev = self.device.as_ref().unwrap();
if (!self.retransmission && !self.reset) || self.device.as_ref().unwrap().skip_fcnt_check {
if (!self.retransmission && !self.reset) || dev.skip_fcnt_check {
return Ok(());
}
let app = self.application.as_ref().unwrap();
let dev = self.device.as_ref().unwrap();
let ts: DateTime<Utc> =
helpers::get_rx_timestamp(&self.uplink_frame_set.rx_info_set).into();
@ -291,14 +298,25 @@ impl Data {
Ok(())
}
fn filter_rx_info_by_tenant(&mut self) -> Result<()> {
async fn filter_rx_info_by_tenant(&mut self) -> Result<()> {
trace!("Filtering rx_info by tenant_id");
filter_rx_info_by_tenant_id(
match filter_rx_info_by_tenant_id(
&self.application.as_ref().unwrap().tenant_id,
&mut self.uplink_frame_set,
)?;
Ok(())
) {
Ok(_) => Ok(()),
Err(v) => {
// Restore the device-session in case of an error (no gateways available).
// This is because during the fcnt validation, we immediately store the
// device-session with incremented fcnt to avoid race conditions.
device_session::save(self.device_session.as_ref().unwrap())
.await
.context("Savel device-session")?;
Err(v)
}
}
}
fn decrypt_f_opts_mac_commands(&mut self) -> Result<()> {
@ -604,6 +622,9 @@ impl Data {
Ok(())
}
// for "normal" uplinks, this is already set by the get_for_phypayload_and_incr_f_cnt_up
// function, however in case of retransmission or reset (if skip_fcnt_check) this is still
// required.
fn sync_uplink_f_cnt(&mut self) -> Result<()> {
trace!("Syncing uplink frame-counter");
let mut ds = self.device_session.as_mut().unwrap();