From c954cd36456e2e9256b1103bea5c9444ec8a0a3b Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Fri, 9 May 2025 14:27:25 +0100 Subject: [PATCH] Store temp. mac-command state in device-session. --- api/proto/internal/internal.proto | 3 + .../proto/chirpstack/internal/internal.proto | 3 + chirpstack/src/downlink/data.rs | 82 +++----- chirpstack/src/maccommand/mod.rs | 9 +- chirpstack/src/storage/mac_command.rs | 82 ++------ chirpstack/src/test/class_a_test.rs | 196 ++++++++---------- lrwn/src/maccommand.rs | 9 + 7 files changed, 147 insertions(+), 237 deletions(-) diff --git a/api/proto/internal/internal.proto b/api/proto/internal/internal.proto index 489eb97c..1b8e93d8 100644 --- a/api/proto/internal/internal.proto +++ b/api/proto/internal/internal.proto @@ -142,6 +142,9 @@ message DeviceSession { // Relay state. Relay relay = 41; + + // Pending mac-commands. + map mac_command_pending = 43; } message UplinkAdrHistory { diff --git a/api/rust/proto/chirpstack/internal/internal.proto b/api/rust/proto/chirpstack/internal/internal.proto index 489eb97c..1b8e93d8 100644 --- a/api/rust/proto/chirpstack/internal/internal.proto +++ b/api/rust/proto/chirpstack/internal/internal.proto @@ -142,6 +142,9 @@ message DeviceSession { // Relay state. Relay relay = 41; + + // Pending mac-commands. + map mac_command_pending = 43; } message UplinkAdrHistory { diff --git a/chirpstack/src/downlink/data.rs b/chirpstack/src/downlink/data.rs index bd059c5e..05296379 100644 --- a/chirpstack/src/downlink/data.rs +++ b/chirpstack/src/downlink/data.rs @@ -707,7 +707,8 @@ impl Data { fn set_phy_payloads(&mut self) -> Result<()> { trace!("Setting downlink PHYPayloads"); let mut f_pending = self.more_device_queue_items; - let ds = self.device.get_device_session()?; + let dev_addr = self.device.get_dev_addr()?; + let ds = self.device.get_device_session_mut()?; for item in self.downlink_frame_items.iter_mut() { let mut mac_size: usize = 0; @@ -729,6 +730,8 @@ impl Data { for mac in &**mac_set { mac_commands.push(mac.clone()); } + + mac_command::set_pending(ds, mac_set)?; } // LoRaWAN MHDR @@ -740,7 +743,7 @@ impl Data { // LoRaWAN MAC payload let mut mac_pl = lrwn::MACPayload { fhdr: lrwn::FHDR { - devaddr: self.device.get_dev_addr()?, + devaddr: dev_addr, f_cnt: ds.n_f_cnt_down, f_ctrl: lrwn::FCtrl { adr: !self.network_conf.adr_disabled, @@ -1196,8 +1199,6 @@ impl Data { if let Some(block) = maccommand::new_channel::request(3, ¤t_channels, &wanted_channels) { - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::NewChannelReq, &block) - .await?; self.mac_commands.push(block); } @@ -1207,7 +1208,7 @@ impl Data { // Note: this must come before ADR! async fn _request_channel_mask_reconfiguration(&mut self) -> Result<()> { trace!("Requesting channel-mask reconfiguration"); - let ds = self.device.get_device_session()?; + let ds = self.device.get_device_session_mut()?; let enabled_uplink_channel_indices: Vec = ds .enabled_uplink_channel_indices @@ -1239,7 +1240,6 @@ impl Data { .collect(), ); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, &set).await?; self.mac_commands.push(set); Ok(()) @@ -1257,12 +1257,15 @@ impl Data { .get_data_rate(self.uplink_frame_set.as_ref().unwrap().dr)?; let ufs = self.uplink_frame_set.as_ref().unwrap(); - let ds = self.device.get_device_session()?; + let dev_eui = self.device.dev_eui; + let device_variables = self.device.variables.into_hashmap(); + let ds = self.device.get_device_session_mut()?; let req = adr::Request { + dev_eui, + device_variables, region_config_id: ufs.region_config_id.clone(), region_common_name: ufs.region_common_name, - dev_eui: self.device.dev_eui, mac_version: self.device_profile.mac_version, reg_params_revision: self.device_profile.reg_params_revision, adr: ds.adr, @@ -1291,7 +1294,6 @@ impl Data { max_dr: self.network_conf.max_dr, uplink_history: ds.uplink_adr_history.clone(), skip_f_cnt_check: ds.skip_f_cnt_check, - device_variables: self.device.variables.into_hashmap(), }; let resp = adr::handle(&self.device_profile.adr_algorithm_id, &req).await; @@ -1304,24 +1306,14 @@ impl Data { { let mut adr_set = false; for set in self.mac_commands.iter_mut() { - let mut is_link_adr_set = false; - for mac in &mut **set { if let lrwn::MACCommand::LinkADRReq(pl) = mac { pl.dr = resp.dr; pl.tx_power = resp.tx_power_index; pl.redundancy.nb_rep = resp.nb_trans; - adr_set = true; - is_link_adr_set = true; } } - - if is_link_adr_set { - // We need to update the pending mac-command. - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, set) - .await?; - } } // There was no existing LinkADRReq to be sent, we need to construct a new one. @@ -1358,7 +1350,6 @@ impl Data { }, )]); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, &set).await?; self.mac_commands.push(set); } } @@ -1406,7 +1397,7 @@ impl Data { async fn _request_rejoin_param_setup(&mut self) -> Result<()> { trace!("Requesting rejoin param setup"); - let ds = self.device.get_device_session()?; + let ds = self.device.get_device_session_mut()?; // Rejoin-request is disabled or device does not support LoRaWAN 1.1. if !self.network_conf.rejoin_request.enabled @@ -1423,8 +1414,6 @@ impl Data { self.network_conf.rejoin_request.max_time_n, self.network_conf.rejoin_request.max_count_n, ); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RejoinParamSetupReq, &set) - .await?; self.mac_commands.push(set); } @@ -1434,7 +1423,7 @@ impl Data { async fn _set_ping_slot_parameters(&mut self) -> Result<()> { trace!("Setting ping-slot parameters"); - let ds = self.device.get_device_session()?; + let ds = self.device.get_device_session_mut()?; if !self.device_profile.supports_class_b { return Ok(()); @@ -1447,8 +1436,6 @@ impl Data { self.network_conf.class_b.ping_slot_dr, self.network_conf.class_b.ping_slot_frequency, ); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::PingSlotChannelReq, &set) - .await?; self.mac_commands.push(set); } @@ -1457,7 +1444,7 @@ impl Data { async fn _set_rx_parameters(&mut self) -> Result<()> { trace!("Setting rx parameters"); - let ds = self.device.get_device_session()?; + let ds = self.device.get_device_session_mut()?; if ds.rx2_frequency != self.network_conf.rx2_frequency || ds.rx2_dr as u8 != self.network_conf.rx2_dr @@ -1468,8 +1455,6 @@ impl Data { self.network_conf.rx2_frequency, self.network_conf.rx2_dr, ); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RxParamSetupReq, &set) - .await?; self.mac_commands.push(set); } @@ -1481,8 +1466,6 @@ impl Data { if dev_rx1_delay != req_rx1_delay { let set = maccommand::rx_timing_setup::request(req_rx1_delay); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RxTimingSetupReq, &set) - .await?; self.mac_commands.push(set); } @@ -1491,7 +1474,7 @@ impl Data { async fn _set_tx_parameters(&mut self) -> Result<()> { trace!("Setting tx parameters"); - let ds = self.device.get_device_session()?; + let ds = self.device.get_device_session_mut()?; if !self .region_conf @@ -1512,8 +1495,6 @@ impl Data { self.network_conf.downlink_dwell_time_400ms, uplink_eirp_index, ); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::TxParamSetupReq, &set) - .await?; self.mac_commands.push(set); } @@ -1567,8 +1548,8 @@ impl Data { || rd.uplink_limit_reload_rate != device.relay_ed_uplink_limit_reload_rate as u32 { - let d = device::get(&device.dev_eui).await?; - let ds = match d.get_device_session() { + let mut d = device::get(&device.dev_eui).await?; + let ds = match d.get_device_session_mut() { Ok(v) => v, Err(_) => { // It is valid that the device is no longer activated. @@ -1595,13 +1576,17 @@ impl Data { }, ), ]); - mac_command::set_pending( - &dev_eui, - lrwn::CID::UpdateUplinkListReq, - &set, + self.mac_commands.push(set); + + // Update device-session of device. + device::partial_update( + d.dev_eui, + &device::DeviceChangeset { + device_session: Some(d.device_session.clone()), + ..Default::default() + }, ) .await?; - self.mac_commands.push(set); rd.dev_addr = dev_addr.to_vec(); rd.root_wor_s_key = root_wor_s_key.to_vec(); @@ -1651,8 +1636,6 @@ impl Data { root_wor_s_key, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::UpdateUplinkListReq, &set) - .await?; self.mac_commands.push(set); ds.relay @@ -1788,8 +1771,6 @@ impl Data { if !commands.is_empty() { let set = lrwn::MACCommandSet::new(commands); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::CtrlUplinkListReq, &set) - .await?; self.mac_commands.push(set); } @@ -1799,7 +1780,6 @@ impl Data { async fn _configure_fwd_limit_req(&mut self) -> Result<()> { trace!("Configuring Relay Fwd Limit"); - let dev_eui = self.device.dev_eui; let ds = self.device.get_device_session_mut()?; let relay_params = self.device_profile.relay_params.clone().unwrap_or_default(); @@ -1843,7 +1823,6 @@ impl Data { }, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::ConfigureFwdLimitReq, &set).await?; self.mac_commands.push(set); } @@ -1915,7 +1894,6 @@ impl Data { } let set = lrwn::MACCommandSet::new(commands); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::FilterListReq, &set).await?; self.mac_commands.push(set); // The deletes needs to be processed before we can add new entries. @@ -1944,8 +1922,6 @@ impl Data { filter_list_eui: vec![], }, )]); - mac_command::set_pending(&self.device.dev_eui, lrwn::CID::FilterListReq, &set) - .await?; self.mac_commands.push(set); // Return because we can't add multiple sets and if we would combine @@ -1977,7 +1953,6 @@ impl Data { filter_list_eui: eui, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::FilterListReq, &set).await?; self.mac_commands.push(set); f.join_eui = device.join_eui.to_vec(); @@ -2009,7 +1984,6 @@ impl Data { filter_list_eui: eui, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::FilterListReq, &set).await?; self.mac_commands.push(set); ds.relay @@ -2037,7 +2011,6 @@ impl Data { async fn _update_relay_conf(&mut self) -> Result<()> { trace!("Updating Relay Conf"); - let dev_eui = self.device.dev_eui; let ds = self.device.get_device_session_mut()?; let relay_params = self.device_profile.relay_params.clone().unwrap_or_default(); @@ -2075,7 +2048,6 @@ impl Data { second_ch_freq: relay_params.second_channel_freq, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::RelayConfReq, &set).await?; self.mac_commands.push(set); } @@ -2087,7 +2059,6 @@ impl Data { async fn _update_end_device_conf(&mut self) -> Result<()> { trace!("Updating End Device Conf"); - let dev_eui = self.device.dev_eui; let ds = self.device.get_device_session_mut()?; let relay_params = self.device_profile.relay_params.clone().unwrap_or_default(); @@ -2124,7 +2095,6 @@ impl Data { second_ch_freq: relay_params.second_channel_freq, }, )]); - mac_command::set_pending(&dev_eui, lrwn::CID::EndDeviceConfReq, &set).await?; self.mac_commands.push(set); } diff --git a/chirpstack/src/maccommand/mod.rs b/chirpstack/src/maccommand/mod.rs index 9e74174f..4a061aff 100644 --- a/chirpstack/src/maccommand/mod.rs +++ b/chirpstack/src/maccommand/mod.rs @@ -77,7 +77,7 @@ pub async fn handle_uplink( ); // Get pending mac-command block, this could return None. - let pending = match mac_command::get_pending(&dev.dev_eui, cid).await { + let pending = match mac_command::get_pending(dev.get_device_session_mut()?, cid).await { Ok(v) => v, Err(e) => { error!(dev_eui = %dev.dev_eui, cid = %cid, error = %e, "Get pending mac-command block error"); @@ -85,13 +85,6 @@ pub async fn handle_uplink( } }; - // Delete the pending mac-command. - if pending.is_some() { - if let Err(e) = mac_command::delete_pending(&dev.dev_eui, cid).await { - error!(dev_eui = %dev.dev_eui, cid = %cid, error = %e, "Delete pending mac-command error"); - } - } - // Handle the mac-command, which might return a block to answer the uplink mac-command // request. let res = match handle( diff --git a/chirpstack/src/storage/mac_command.rs b/chirpstack/src/storage/mac_command.rs index f248e719..ba5b3409 100644 --- a/chirpstack/src/storage/mac_command.rs +++ b/chirpstack/src/storage/mac_command.rs @@ -1,34 +1,24 @@ use anyhow::Result; use tracing::info; -use super::{get_async_redis_conn, redis_key}; -use crate::config; -use lrwn::EUI64; +use chirpstack_api::internal; -pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommandSet) -> Result<()> { - let conf = config::get(); - - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); - let ttl = conf.network.device_session_ttl.as_millis() as usize; +pub fn set_pending(ds: &mut internal::DeviceSession, set: &lrwn::MACCommandSet) -> Result<()> { + let cid = set.cid()?; let b = set.to_vec()?; - - () = redis::cmd("PSETEX") - .arg(key) - .arg(ttl) - .arg(b) - .query_async(&mut get_async_redis_conn().await?) - .await?; - - info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block set"); + ds.mac_command_pending.insert(cid.to_u8().into(), b); + info!(cid = %cid, "Pending mac-command block set"); Ok(()) } -pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result> { - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); - let b: Vec = redis::cmd("GET") - .arg(key) - .query_async(&mut get_async_redis_conn().await?) - .await?; +pub async fn get_pending( + ds: &mut internal::DeviceSession, + cid: lrwn::CID, +) -> Result> { + let b = ds + .mac_command_pending + .remove(&cid.to_u8().into()) + .unwrap_or_default(); let out = if !b.is_empty() { let mut mac = lrwn::MACCommandSet::from_slice(&b); @@ -44,49 +34,3 @@ pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result Result<()> { - let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8())); - - () = redis::cmd("DEL") - .arg(key) - .query_async(&mut get_async_redis_conn().await?) - .await?; - - info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block deleted"); - Ok(()) -} - -#[cfg(test)] -pub mod test { - use super::*; - use crate::test; - - #[tokio::test] - async fn test_mac_command() { - let _guard = test::prepare().await; - - let dev_eui = EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]); - let mac = lrwn::MACCommandSet::new(vec![lrwn::MACCommand::DevStatusReq]); - - // set - set_pending(&dev_eui, lrwn::CID::DevStatusReq, &mac) - .await - .unwrap(); - - // get - let mac_get = get_pending(&dev_eui, lrwn::CID::DevStatusReq) - .await - .unwrap(); - assert_eq!(mac, mac_get.unwrap()); - - // delete - delete_pending(&dev_eui, lrwn::CID::DevStatusReq) - .await - .unwrap(); - let resp = get_pending(&dev_eui, lrwn::CID::DevStatusReq) - .await - .unwrap(); - assert!(resp.is_none()); - } -} diff --git a/chirpstack/src/test/class_a_test.rs b/chirpstack/src/test/class_a_test.rs index 980500bf..b930640b 100644 --- a/chirpstack/src/test/class_a_test.rs +++ b/chirpstack/src/test/class_a_test.rs @@ -4230,33 +4230,30 @@ async fn test_lorawan_10_adr() { name: "acknowledgement of pending adr request".into(), dev_eui: dev.dev_eui, device_queue_items: vec![], - before_func: Some(Box::new(move || { - let dev_eui = dev.dev_eui; - Box::pin(async move { - mac_command::set_pending( - &dev_eui, - lrwn::CID::LinkADRReq, - &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( - lrwn::LinkADRReqPayload { - dr: 0, - tx_power: 3, - ch_mask: lrwn::ChMask::new([ - true, true, true, false, false, false, false, false, false, - false, false, false, false, false, false, false, - ]), - redundancy: lrwn::Redundancy { - ch_mask_cntl: 0, - nb_rep: 1, - }, - }, - )]), - ) - .await - .unwrap(); - }) - })), + before_func: None, after_func: None, - device_session: Some(ds.clone()), + device_session: Some({ + let mut ds = ds.clone(); + mac_command::set_pending( + &mut ds, + &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( + lrwn::LinkADRReqPayload { + dr: 0, + tx_power: 3, + ch_mask: lrwn::ChMask::new([ + true, true, true, false, false, false, false, false, false, false, + false, false, false, false, false, false, + ]), + redundancy: lrwn::Redundancy { + ch_mask_cntl: 0, + nb_rep: 1, + }, + }, + )]), + ) + .unwrap(); + ds + }), tx_info: tx_info.clone(), rx_info: rx_info.clone(), phy_payload: lrwn::PhyPayload { @@ -4298,33 +4295,30 @@ async fn test_lorawan_10_adr() { name: "negative acknowledgement of pending adr request".into(), dev_eui: dev.dev_eui, device_queue_items: vec![], - before_func: Some(Box::new(move || { - let dev_eui = dev.dev_eui; - Box::pin(async move { - mac_command::set_pending( - &dev_eui, - lrwn::CID::LinkADRReq, - &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( - lrwn::LinkADRReqPayload { - dr: 0, - tx_power: 3, - ch_mask: lrwn::ChMask::new([ - true, true, true, false, false, false, false, false, false, - false, false, false, false, false, false, false, - ]), - redundancy: lrwn::Redundancy { - ch_mask_cntl: 0, - nb_rep: 1, - }, - }, - )]), - ) - .await - .unwrap(); - }) - })), + before_func: None, after_func: None, - device_session: Some(ds.clone()), + device_session: Some({ + let mut ds = ds.clone(); + mac_command::set_pending( + &mut ds, + &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( + lrwn::LinkADRReqPayload { + dr: 0, + tx_power: 3, + ch_mask: lrwn::ChMask::new([ + true, true, true, false, false, false, false, false, false, false, + false, false, false, false, false, false, + ]), + redundancy: lrwn::Redundancy { + ch_mask_cntl: 0, + nb_rep: 1, + }, + }, + )]), + ) + .unwrap(); + ds + }), tx_info: tx_info.clone(), rx_info: rx_info.clone(), phy_payload: lrwn::PhyPayload { @@ -4540,33 +4534,30 @@ async fn test_lorawan_10_adr() { name: "new channel re-configuration ack-ed".into(), dev_eui: dev.dev_eui, device_queue_items: vec![], - before_func: Some(Box::new(move || { - let dev_eui = dev.dev_eui; - Box::pin(async move { - mac_command::set_pending( - &dev_eui, - lrwn::CID::LinkADRReq, - &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( - lrwn::LinkADRReqPayload { - dr: 0, - tx_power: 1, - ch_mask: lrwn::ChMask::new([ - true, true, true, false, false, false, false, false, false, - false, false, false, false, false, false, false, - ]), - redundancy: lrwn::Redundancy { - ch_mask_cntl: 0, - nb_rep: 0, - }, - }, - )]), - ) - .await - .unwrap(); - }) - })), + before_func: None, after_func: None, - device_session: Some(ds_7chan.clone()), + device_session: Some({ + let mut ds = ds_7chan.clone(); + mac_command::set_pending( + &mut ds, + &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( + lrwn::LinkADRReqPayload { + dr: 0, + tx_power: 1, + ch_mask: lrwn::ChMask::new([ + true, true, true, false, false, false, false, false, false, false, + false, false, false, false, false, false, + ]), + redundancy: lrwn::Redundancy { + ch_mask_cntl: 0, + nb_rep: 0, + }, + }, + )]), + ) + .unwrap(); + ds + }), tx_info: tx_info.clone(), rx_info: rx_info.clone(), phy_payload: lrwn::PhyPayload { @@ -4606,33 +4597,30 @@ async fn test_lorawan_10_adr() { name: "new channel re-configuration not ack-ed".into(), dev_eui: dev.dev_eui, device_queue_items: vec![], - before_func: Some(Box::new(move || { - let dev_eui = dev.dev_eui; - Box::pin(async move { - mac_command::set_pending( - &dev_eui, - lrwn::CID::LinkADRReq, - &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( - lrwn::LinkADRReqPayload { - dr: 0, - tx_power: 1, - ch_mask: lrwn::ChMask::new([ - true, true, true, false, false, false, false, false, false, - false, false, false, false, false, false, false, - ]), - redundancy: lrwn::Redundancy { - ch_mask_cntl: 0, - nb_rep: 0, - }, - }, - )]), - ) - .await - .unwrap(); - }) - })), + before_func: None, after_func: None, - device_session: Some(ds_7chan.clone()), + device_session: Some({ + let mut ds = ds_7chan.clone(); + mac_command::set_pending( + &mut ds, + &lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq( + lrwn::LinkADRReqPayload { + dr: 0, + tx_power: 1, + ch_mask: lrwn::ChMask::new([ + true, true, true, false, false, false, false, false, false, false, + false, false, false, false, false, false, + ]), + redundancy: lrwn::Redundancy { + ch_mask_cntl: 0, + nb_rep: 0, + }, + }, + )]), + ) + .unwrap(); + ds + }), tx_info: tx_info.clone(), rx_info: rx_info.clone(), phy_payload: lrwn::PhyPayload { diff --git a/lrwn/src/maccommand.rs b/lrwn/src/maccommand.rs index 58629832..0d3912fe 100644 --- a/lrwn/src/maccommand.rs +++ b/lrwn/src/maccommand.rs @@ -402,6 +402,15 @@ impl MACCommandSet { MACCommandSet(macs) } + // This reads the CID from the first mac-command in the set. It is assumed + // that all mac-commands in the set share the same CID. + pub fn cid(&self) -> Result { + self.0 + .first() + .map(|v| v.cid()) + .ok_or_else(|| anyhow!("Set is empty")) + } + pub fn size(&self) -> Result { let b = self.to_vec()?; Ok(b.len())