Store temp. mac-command state in device-session.
Some checks failed
CI / tests (postgres) (push) Has been cancelled
CI / tests (sqlite) (push) Has been cancelled
CI / dist (postgres) (push) Has been cancelled
CI / dist (sqlite) (push) Has been cancelled

This commit is contained in:
Orne Brocaar 2025-05-09 14:27:25 +01:00
parent 188ef3d8f3
commit c954cd3645
7 changed files with 147 additions and 237 deletions

View File

@ -142,6 +142,9 @@ message DeviceSession {
// Relay state.
Relay relay = 41;
// Pending mac-commands.
map<uint32, bytes> mac_command_pending = 43;
}
message UplinkAdrHistory {

View File

@ -142,6 +142,9 @@ message DeviceSession {
// Relay state.
Relay relay = 41;
// Pending mac-commands.
map<uint32, bytes> mac_command_pending = 43;
}
message UplinkAdrHistory {

View File

@ -707,7 +707,8 @@ impl Data {
fn set_phy_payloads(&mut self) -> Result<()> {
trace!("Setting downlink PHYPayloads");
let mut f_pending = self.more_device_queue_items;
let ds = self.device.get_device_session()?;
let dev_addr = self.device.get_dev_addr()?;
let ds = self.device.get_device_session_mut()?;
for item in self.downlink_frame_items.iter_mut() {
let mut mac_size: usize = 0;
@ -729,6 +730,8 @@ impl Data {
for mac in &**mac_set {
mac_commands.push(mac.clone());
}
mac_command::set_pending(ds, mac_set)?;
}
// LoRaWAN MHDR
@ -740,7 +743,7 @@ impl Data {
// LoRaWAN MAC payload
let mut mac_pl = lrwn::MACPayload {
fhdr: lrwn::FHDR {
devaddr: self.device.get_dev_addr()?,
devaddr: dev_addr,
f_cnt: ds.n_f_cnt_down,
f_ctrl: lrwn::FCtrl {
adr: !self.network_conf.adr_disabled,
@ -1196,8 +1199,6 @@ impl Data {
if let Some(block) =
maccommand::new_channel::request(3, &current_channels, &wanted_channels)
{
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::NewChannelReq, &block)
.await?;
self.mac_commands.push(block);
}
@ -1207,7 +1208,7 @@ impl Data {
// Note: this must come before ADR!
async fn _request_channel_mask_reconfiguration(&mut self) -> Result<()> {
trace!("Requesting channel-mask reconfiguration");
let ds = self.device.get_device_session()?;
let ds = self.device.get_device_session_mut()?;
let enabled_uplink_channel_indices: Vec<usize> = ds
.enabled_uplink_channel_indices
@ -1239,7 +1240,6 @@ impl Data {
.collect(),
);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, &set).await?;
self.mac_commands.push(set);
Ok(())
@ -1257,12 +1257,15 @@ impl Data {
.get_data_rate(self.uplink_frame_set.as_ref().unwrap().dr)?;
let ufs = self.uplink_frame_set.as_ref().unwrap();
let ds = self.device.get_device_session()?;
let dev_eui = self.device.dev_eui;
let device_variables = self.device.variables.into_hashmap();
let ds = self.device.get_device_session_mut()?;
let req = adr::Request {
dev_eui,
device_variables,
region_config_id: ufs.region_config_id.clone(),
region_common_name: ufs.region_common_name,
dev_eui: self.device.dev_eui,
mac_version: self.device_profile.mac_version,
reg_params_revision: self.device_profile.reg_params_revision,
adr: ds.adr,
@ -1291,7 +1294,6 @@ impl Data {
max_dr: self.network_conf.max_dr,
uplink_history: ds.uplink_adr_history.clone(),
skip_f_cnt_check: ds.skip_f_cnt_check,
device_variables: self.device.variables.into_hashmap(),
};
let resp = adr::handle(&self.device_profile.adr_algorithm_id, &req).await;
@ -1304,24 +1306,14 @@ impl Data {
{
let mut adr_set = false;
for set in self.mac_commands.iter_mut() {
let mut is_link_adr_set = false;
for mac in &mut **set {
if let lrwn::MACCommand::LinkADRReq(pl) = mac {
pl.dr = resp.dr;
pl.tx_power = resp.tx_power_index;
pl.redundancy.nb_rep = resp.nb_trans;
adr_set = true;
is_link_adr_set = true;
}
}
if is_link_adr_set {
// We need to update the pending mac-command.
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, set)
.await?;
}
}
// There was no existing LinkADRReq to be sent, we need to construct a new one.
@ -1358,7 +1350,6 @@ impl Data {
},
)]);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::LinkADRReq, &set).await?;
self.mac_commands.push(set);
}
}
@ -1406,7 +1397,7 @@ impl Data {
async fn _request_rejoin_param_setup(&mut self) -> Result<()> {
trace!("Requesting rejoin param setup");
let ds = self.device.get_device_session()?;
let ds = self.device.get_device_session_mut()?;
// Rejoin-request is disabled or device does not support LoRaWAN 1.1.
if !self.network_conf.rejoin_request.enabled
@ -1423,8 +1414,6 @@ impl Data {
self.network_conf.rejoin_request.max_time_n,
self.network_conf.rejoin_request.max_count_n,
);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RejoinParamSetupReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1434,7 +1423,7 @@ impl Data {
async fn _set_ping_slot_parameters(&mut self) -> Result<()> {
trace!("Setting ping-slot parameters");
let ds = self.device.get_device_session()?;
let ds = self.device.get_device_session_mut()?;
if !self.device_profile.supports_class_b {
return Ok(());
@ -1447,8 +1436,6 @@ impl Data {
self.network_conf.class_b.ping_slot_dr,
self.network_conf.class_b.ping_slot_frequency,
);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::PingSlotChannelReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1457,7 +1444,7 @@ impl Data {
async fn _set_rx_parameters(&mut self) -> Result<()> {
trace!("Setting rx parameters");
let ds = self.device.get_device_session()?;
let ds = self.device.get_device_session_mut()?;
if ds.rx2_frequency != self.network_conf.rx2_frequency
|| ds.rx2_dr as u8 != self.network_conf.rx2_dr
@ -1468,8 +1455,6 @@ impl Data {
self.network_conf.rx2_frequency,
self.network_conf.rx2_dr,
);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RxParamSetupReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1481,8 +1466,6 @@ impl Data {
if dev_rx1_delay != req_rx1_delay {
let set = maccommand::rx_timing_setup::request(req_rx1_delay);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::RxTimingSetupReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1491,7 +1474,7 @@ impl Data {
async fn _set_tx_parameters(&mut self) -> Result<()> {
trace!("Setting tx parameters");
let ds = self.device.get_device_session()?;
let ds = self.device.get_device_session_mut()?;
if !self
.region_conf
@ -1512,8 +1495,6 @@ impl Data {
self.network_conf.downlink_dwell_time_400ms,
uplink_eirp_index,
);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::TxParamSetupReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1567,8 +1548,8 @@ impl Data {
|| rd.uplink_limit_reload_rate
!= device.relay_ed_uplink_limit_reload_rate as u32
{
let d = device::get(&device.dev_eui).await?;
let ds = match d.get_device_session() {
let mut d = device::get(&device.dev_eui).await?;
let ds = match d.get_device_session_mut() {
Ok(v) => v,
Err(_) => {
// It is valid that the device is no longer activated.
@ -1595,13 +1576,17 @@ impl Data {
},
),
]);
mac_command::set_pending(
&dev_eui,
lrwn::CID::UpdateUplinkListReq,
&set,
self.mac_commands.push(set);
// Update device-session of device.
device::partial_update(
d.dev_eui,
&device::DeviceChangeset {
device_session: Some(d.device_session.clone()),
..Default::default()
},
)
.await?;
self.mac_commands.push(set);
rd.dev_addr = dev_addr.to_vec();
rd.root_wor_s_key = root_wor_s_key.to_vec();
@ -1651,8 +1636,6 @@ impl Data {
root_wor_s_key,
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::UpdateUplinkListReq, &set)
.await?;
self.mac_commands.push(set);
ds.relay
@ -1788,8 +1771,6 @@ impl Data {
if !commands.is_empty() {
let set = lrwn::MACCommandSet::new(commands);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::CtrlUplinkListReq, &set)
.await?;
self.mac_commands.push(set);
}
@ -1799,7 +1780,6 @@ impl Data {
async fn _configure_fwd_limit_req(&mut self) -> Result<()> {
trace!("Configuring Relay Fwd Limit");
let dev_eui = self.device.dev_eui;
let ds = self.device.get_device_session_mut()?;
let relay_params = self.device_profile.relay_params.clone().unwrap_or_default();
@ -1843,7 +1823,6 @@ impl Data {
},
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::ConfigureFwdLimitReq, &set).await?;
self.mac_commands.push(set);
}
@ -1915,7 +1894,6 @@ impl Data {
}
let set = lrwn::MACCommandSet::new(commands);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::FilterListReq, &set).await?;
self.mac_commands.push(set);
// The deletes needs to be processed before we can add new entries.
@ -1944,8 +1922,6 @@ impl Data {
filter_list_eui: vec![],
},
)]);
mac_command::set_pending(&self.device.dev_eui, lrwn::CID::FilterListReq, &set)
.await?;
self.mac_commands.push(set);
// Return because we can't add multiple sets and if we would combine
@ -1977,7 +1953,6 @@ impl Data {
filter_list_eui: eui,
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::FilterListReq, &set).await?;
self.mac_commands.push(set);
f.join_eui = device.join_eui.to_vec();
@ -2009,7 +1984,6 @@ impl Data {
filter_list_eui: eui,
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::FilterListReq, &set).await?;
self.mac_commands.push(set);
ds.relay
@ -2037,7 +2011,6 @@ impl Data {
async fn _update_relay_conf(&mut self) -> Result<()> {
trace!("Updating Relay Conf");
let dev_eui = self.device.dev_eui;
let ds = self.device.get_device_session_mut()?;
let relay_params = self.device_profile.relay_params.clone().unwrap_or_default();
@ -2075,7 +2048,6 @@ impl Data {
second_ch_freq: relay_params.second_channel_freq,
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::RelayConfReq, &set).await?;
self.mac_commands.push(set);
}
@ -2087,7 +2059,6 @@ impl Data {
async fn _update_end_device_conf(&mut self) -> Result<()> {
trace!("Updating End Device Conf");
let dev_eui = self.device.dev_eui;
let ds = self.device.get_device_session_mut()?;
let relay_params = self.device_profile.relay_params.clone().unwrap_or_default();
@ -2124,7 +2095,6 @@ impl Data {
second_ch_freq: relay_params.second_channel_freq,
},
)]);
mac_command::set_pending(&dev_eui, lrwn::CID::EndDeviceConfReq, &set).await?;
self.mac_commands.push(set);
}

View File

@ -77,7 +77,7 @@ pub async fn handle_uplink(
);
// Get pending mac-command block, this could return None.
let pending = match mac_command::get_pending(&dev.dev_eui, cid).await {
let pending = match mac_command::get_pending(dev.get_device_session_mut()?, cid).await {
Ok(v) => v,
Err(e) => {
error!(dev_eui = %dev.dev_eui, cid = %cid, error = %e, "Get pending mac-command block error");
@ -85,13 +85,6 @@ pub async fn handle_uplink(
}
};
// Delete the pending mac-command.
if pending.is_some() {
if let Err(e) = mac_command::delete_pending(&dev.dev_eui, cid).await {
error!(dev_eui = %dev.dev_eui, cid = %cid, error = %e, "Delete pending mac-command error");
}
}
// Handle the mac-command, which might return a block to answer the uplink mac-command
// request.
let res = match handle(

View File

@ -1,34 +1,24 @@
use anyhow::Result;
use tracing::info;
use super::{get_async_redis_conn, redis_key};
use crate::config;
use lrwn::EUI64;
use chirpstack_api::internal;
pub async fn set_pending(dev_eui: &EUI64, cid: lrwn::CID, set: &lrwn::MACCommandSet) -> Result<()> {
let conf = config::get();
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let ttl = conf.network.device_session_ttl.as_millis() as usize;
pub fn set_pending(ds: &mut internal::DeviceSession, set: &lrwn::MACCommandSet) -> Result<()> {
let cid = set.cid()?;
let b = set.to_vec()?;
() = redis::cmd("PSETEX")
.arg(key)
.arg(ttl)
.arg(b)
.query_async(&mut get_async_redis_conn().await?)
.await?;
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block set");
ds.mac_command_pending.insert(cid.to_u8().into(), b);
info!(cid = %cid, "Pending mac-command block set");
Ok(())
}
pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<Option<lrwn::MACCommandSet>> {
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
let b: Vec<u8> = redis::cmd("GET")
.arg(key)
.query_async(&mut get_async_redis_conn().await?)
.await?;
pub async fn get_pending(
ds: &mut internal::DeviceSession,
cid: lrwn::CID,
) -> Result<Option<lrwn::MACCommandSet>> {
let b = ds
.mac_command_pending
.remove(&cid.to_u8().into())
.unwrap_or_default();
let out = if !b.is_empty() {
let mut mac = lrwn::MACCommandSet::from_slice(&b);
@ -44,49 +34,3 @@ pub async fn get_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<Option<lrwn:
Ok(out)
}
pub async fn delete_pending(dev_eui: &EUI64, cid: lrwn::CID) -> Result<()> {
let key = redis_key(format!("device:{}:mac:pending:{}", dev_eui, cid.to_u8()));
() = redis::cmd("DEL")
.arg(key)
.query_async(&mut get_async_redis_conn().await?)
.await?;
info!(dev_eui = %dev_eui, cid = %cid, "Pending mac-command block deleted");
Ok(())
}
#[cfg(test)]
pub mod test {
use super::*;
use crate::test;
#[tokio::test]
async fn test_mac_command() {
let _guard = test::prepare().await;
let dev_eui = EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
let mac = lrwn::MACCommandSet::new(vec![lrwn::MACCommand::DevStatusReq]);
// set
set_pending(&dev_eui, lrwn::CID::DevStatusReq, &mac)
.await
.unwrap();
// get
let mac_get = get_pending(&dev_eui, lrwn::CID::DevStatusReq)
.await
.unwrap();
assert_eq!(mac, mac_get.unwrap());
// delete
delete_pending(&dev_eui, lrwn::CID::DevStatusReq)
.await
.unwrap();
let resp = get_pending(&dev_eui, lrwn::CID::DevStatusReq)
.await
.unwrap();
assert!(resp.is_none());
}
}

View File

@ -4230,33 +4230,30 @@ async fn test_lorawan_10_adr() {
name: "acknowledgement of pending adr request".into(),
dev_eui: dev.dev_eui,
device_queue_items: vec![],
before_func: Some(Box::new(move || {
let dev_eui = dev.dev_eui;
Box::pin(async move {
mac_command::set_pending(
&dev_eui,
lrwn::CID::LinkADRReq,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 3,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false,
false, false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 1,
},
},
)]),
)
.await
.unwrap();
})
})),
before_func: None,
after_func: None,
device_session: Some(ds.clone()),
device_session: Some({
let mut ds = ds.clone();
mac_command::set_pending(
&mut ds,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 3,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false, false,
false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 1,
},
},
)]),
)
.unwrap();
ds
}),
tx_info: tx_info.clone(),
rx_info: rx_info.clone(),
phy_payload: lrwn::PhyPayload {
@ -4298,33 +4295,30 @@ async fn test_lorawan_10_adr() {
name: "negative acknowledgement of pending adr request".into(),
dev_eui: dev.dev_eui,
device_queue_items: vec![],
before_func: Some(Box::new(move || {
let dev_eui = dev.dev_eui;
Box::pin(async move {
mac_command::set_pending(
&dev_eui,
lrwn::CID::LinkADRReq,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 3,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false,
false, false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 1,
},
},
)]),
)
.await
.unwrap();
})
})),
before_func: None,
after_func: None,
device_session: Some(ds.clone()),
device_session: Some({
let mut ds = ds.clone();
mac_command::set_pending(
&mut ds,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 3,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false, false,
false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 1,
},
},
)]),
)
.unwrap();
ds
}),
tx_info: tx_info.clone(),
rx_info: rx_info.clone(),
phy_payload: lrwn::PhyPayload {
@ -4540,33 +4534,30 @@ async fn test_lorawan_10_adr() {
name: "new channel re-configuration ack-ed".into(),
dev_eui: dev.dev_eui,
device_queue_items: vec![],
before_func: Some(Box::new(move || {
let dev_eui = dev.dev_eui;
Box::pin(async move {
mac_command::set_pending(
&dev_eui,
lrwn::CID::LinkADRReq,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 1,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false,
false, false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 0,
},
},
)]),
)
.await
.unwrap();
})
})),
before_func: None,
after_func: None,
device_session: Some(ds_7chan.clone()),
device_session: Some({
let mut ds = ds_7chan.clone();
mac_command::set_pending(
&mut ds,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 1,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false, false,
false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 0,
},
},
)]),
)
.unwrap();
ds
}),
tx_info: tx_info.clone(),
rx_info: rx_info.clone(),
phy_payload: lrwn::PhyPayload {
@ -4606,33 +4597,30 @@ async fn test_lorawan_10_adr() {
name: "new channel re-configuration not ack-ed".into(),
dev_eui: dev.dev_eui,
device_queue_items: vec![],
before_func: Some(Box::new(move || {
let dev_eui = dev.dev_eui;
Box::pin(async move {
mac_command::set_pending(
&dev_eui,
lrwn::CID::LinkADRReq,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 1,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false,
false, false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 0,
},
},
)]),
)
.await
.unwrap();
})
})),
before_func: None,
after_func: None,
device_session: Some(ds_7chan.clone()),
device_session: Some({
let mut ds = ds_7chan.clone();
mac_command::set_pending(
&mut ds,
&lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRReq(
lrwn::LinkADRReqPayload {
dr: 0,
tx_power: 1,
ch_mask: lrwn::ChMask::new([
true, true, true, false, false, false, false, false, false, false,
false, false, false, false, false, false,
]),
redundancy: lrwn::Redundancy {
ch_mask_cntl: 0,
nb_rep: 0,
},
},
)]),
)
.unwrap();
ds
}),
tx_info: tx_info.clone(),
rx_info: rx_info.clone(),
phy_payload: lrwn::PhyPayload {

View File

@ -402,6 +402,15 @@ impl MACCommandSet {
MACCommandSet(macs)
}
// This reads the CID from the first mac-command in the set. It is assumed
// that all mac-commands in the set share the same CID.
pub fn cid(&self) -> Result<CID> {
self.0
.first()
.map(|v| v.cid())
.ok_or_else(|| anyhow!("Set is empty"))
}
pub fn size(&self) -> Result<usize> {
let b = self.to_vec()?;
Ok(b.len())