Implement handling AppTimeReq / AppTimeAns.
Some checks failed
CI / tests (postgres) (push) Has been cancelled
CI / tests (sqlite) (push) Has been cancelled
CI / dist (postgres) (push) Has been cancelled
CI / dist (sqlite) (push) Has been cancelled

This commit is contained in:
Orne Brocaar 2025-02-18 14:08:28 +00:00
parent 8e47ea1483
commit 62e8cfb7f1
6 changed files with 358 additions and 11 deletions

10
chirpstack/src/aeskey.rs Normal file
View File

@ -0,0 +1,10 @@
use rand::RngCore;
use lrwn::AES128Key;
pub fn get_random_aes_key() -> AES128Key {
let mut rng = rand::thread_rng();
let mut key: [u8; 16] = [0; 16];
rng.fill_bytes(&mut key);
AES128Key::from_bytes(key)
}

View File

@ -0,0 +1,251 @@
use anyhow::Result;
use tracing::info;
use crate::gpstime::ToGpsTime;
use crate::storage::fields::device_profile::Ts003Version;
use crate::storage::{device, device_profile, device_queue};
use crate::uplink::helpers;
use chirpstack_api::gw;
use lrwn::applayer::clocksync;
pub async fn handle_uplink(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
data: &[u8],
) -> Result<()> {
let version = dp
.app_layer_params
.ts003_version
.ok_or_else(|| anyhow!("Device does not support TS003"))?;
match version {
Ts003Version::V100 => handle_uplink_v100(dev, dp, rx_info, data).await,
}
}
async fn handle_uplink_v100(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
data: &[u8],
) -> Result<()> {
let pl = clocksync::v1::Payload::from_slice(true, data)?;
match pl {
clocksync::v1::Payload::AppTimeReq(pl) => {
handle_v1_app_time_req(dev, dp, rx_info, pl).await?
}
_ => {}
}
Ok(())
}
async fn handle_v1_app_time_req(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
pl: clocksync::v1::AppTimeReqPayload,
) -> Result<()> {
info!("Handling AppTimeReq");
let now_time_since_gps = if let Some(t) = helpers::get_time_since_gps_epoch(rx_info) {
chrono::Duration::from_std(t)?
} else {
helpers::get_rx_timestamp_chrono(rx_info).to_gps_time()
};
let dev_time_since_gps = chrono::Duration::seconds(pl.device_time.into());
let time_diff = (now_time_since_gps - dev_time_since_gps).num_seconds();
let time_correction: i32 = if time_diff < 0 {
time_diff.try_into().unwrap_or(i32::MIN)
} else {
time_diff.try_into().unwrap_or(i32::MAX)
};
if time_diff == 0 && !pl.param.ans_required {
return Ok(());
}
info!(
time_correcrtion = time_correction,
"Responding with AppTimeAns"
);
let ans = clocksync::v1::Payload::AppTimeAns(clocksync::v1::AppTimeAnsPayload {
time_correction,
param: clocksync::v1::AppTimeAnsPayloadParam {
token_ans: pl.param.token_req,
},
});
device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: dev.dev_eui,
f_port: dp.app_layer_params.ts003_f_port.into(),
data: ans.to_vec()?,
..Default::default()
})
.await?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::applayer::handle_uplink;
use crate::storage::{application, device_queue, fields, tenant};
use crate::test;
use lrwn::EUI64;
use std::time::Duration;
#[tokio::test]
async fn test_handle_v1_app_time_req() {
struct Test {
name: String,
rx_info: gw::UplinkRxInfo,
req: clocksync::v1::AppTimeReqPayload,
expected: Option<clocksync::v1::AppTimeAnsPayload>,
}
let tests = vec![
Test {
name: "device synced".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v1::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v1::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: None,
},
Test {
name: "device synced - ans required".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v1::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v1::AppTimeReqPayloadParam {
token_req: 8,
ans_required: true,
},
},
expected: Some(clocksync::v1::AppTimeAnsPayload {
time_correction: 0,
param: clocksync::v1::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
Test {
name: "device not synced (positive correction)".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v1::AppTimeReqPayload {
device_time: 1200,
param: clocksync::v1::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: Some(clocksync::v1::AppTimeAnsPayload {
time_correction: 34,
param: clocksync::v1::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
Test {
name: "device not synced (negative correction)".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1200).try_into().unwrap()),
..Default::default()
},
req: clocksync::v1::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v1::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: Some(clocksync::v1::AppTimeAnsPayload {
time_correction: -34,
param: clocksync::v1::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
];
let _guard = test::prepare().await;
let t = tenant::create(tenant::Tenant {
name: "test-tenant".into(),
..Default::default()
})
.await
.unwrap();
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id,
..Default::default()
})
.await
.unwrap();
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id,
app_layer_params: fields::AppLayerParams {
ts003_version: Some(fields::Ts003Version::V100),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let d = device::create(device::Device {
name: "test-dev".into(),
dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
application_id: app.id,
device_profile_id: dp.id,
..Default::default()
})
.await
.unwrap();
for tst in &tests {
println!("> {}", tst.name);
device_queue::flush_for_dev_eui(&d.dev_eui).await.unwrap();
let pl = clocksync::v1::Payload::AppTimeReq(tst.req.clone());
handle_uplink(
&d,
&dp,
&[tst.rx_info.clone()],
dp.app_layer_params.ts003_f_port,
&pl.to_vec().unwrap(),
)
.await;
let queue_items = device_queue::get_for_dev_eui(&d.dev_eui).await.unwrap();
if let Some(expected_pl) = &tst.expected {
assert_eq!(1, queue_items.len());
let qi = queue_items.first().unwrap();
assert_eq!(dp.app_layer_params.ts003_f_port as i16, qi.f_port);
let qi_pl = clocksync::v1::Payload::from_slice(false, &qi.data).unwrap();
let expected_pl = clocksync::v1::Payload::AppTimeAns(expected_pl.clone());
assert_eq!(expected_pl, qi_pl);
} else {
assert!(queue_items.is_empty());
}
}
}
}

View File

@ -0,0 +1,40 @@
use anyhow::Result;
use tracing::{span, warn, Instrument, Level};
use crate::storage::{device, device_profile};
use chirpstack_api::gw;
pub mod clocksync;
pub async fn handle_uplink(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
f_port: u8,
data: &[u8],
) {
if let Err(e) = _handle_uplink(dev, dp, rx_info, f_port, data).await {
warn!(error = %e, "Handle applayer payload error");
}
}
async fn _handle_uplink(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
f_port: u8,
data: &[u8],
) -> Result<()> {
if dp.app_layer_params.ts003_f_port == f_port {
let span = span!(Level::INFO, "ts003");
clocksync::handle_uplink(dev, dp, rx_info, data)
.instrument(span)
.await
} else if dp.app_layer_params.ts004_f_port == f_port {
unimplemented!()
} else if dp.app_layer_params.ts005_f_port == f_port {
unimplemented!()
} else {
return Err(anyhow!("Unexpected f_port {}", f_port));
}
}

View File

@ -10,7 +10,10 @@ mod uuid;
pub use big_decimal::BigDecimal;
pub use dev_nonces::DevNonces;
pub use device_profile::{AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams};
pub use device_profile::{
AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams, Ts003Version, Ts004Version,
Ts005Version,
};
pub use device_session::DeviceSession;
pub use fuota::{FuotaJob, RequestFragmentationSessionStatus};
pub use key_value::KeyValue;

View File

@ -8,6 +8,7 @@ use tracing::{debug, error, info, span, trace, warn, Instrument, Level};
use super::error::Error;
use super::{data_fns, filter_rx_info_by_tenant_id, helpers, RelayContext, UplinkFrameSet};
use crate::api::helpers::ToProto;
use crate::applayer;
use crate::backend::roaming;
use crate::helpers::errors::PrintFullError;
use crate::storage::error::Error as StorageError;
@ -140,6 +141,9 @@ impl Data {
}
ctx.append_meta_data_to_uplink_history()?;
ctx.send_uplink_event().await?;
if ctx._is_applayer() {
ctx.handle_applayer().await?;
}
ctx.detect_and_save_measurements().await?;
ctx.sync_uplink_f_cnt()?;
ctx.set_region_config_id()?;
@ -997,6 +1001,33 @@ impl Data {
Ok(())
}
async fn handle_applayer(&self) -> Result<()> {
trace!("Handling applayer protocol");
let dev = self.device.as_ref().unwrap();
let dp = self.device_profile.as_ref().unwrap();
let mac = if let lrwn::Payload::MACPayload(pl) = &self.phy_payload.payload {
pl
} else {
return Err(anyhow!("Expected MacPayload"));
};
applayer::handle_uplink(
dev,
dp,
&self.uplink_frame_set.rx_info_set,
mac.f_port.unwrap_or(0),
match &mac.frm_payload {
Some(lrwn::FRMPayload::Raw(b)) => b,
_ => &[],
},
)
.await;
Ok(())
}
async fn detect_and_save_measurements(&mut self) -> Result<()> {
trace!("Detecing and saving measurements");
@ -1424,4 +1455,16 @@ impl Data {
false
}
fn _is_applayer(&self) -> bool {
let dp = self.device_profile.as_ref().unwrap();
let mac = if let lrwn::Payload::MACPayload(pl) = &self.phy_payload.payload {
pl
} else {
return false;
};
dp.app_layer_params
.is_app_layer_f_port(mac.f_port.unwrap_or(0))
}
}

View File

@ -108,7 +108,7 @@ impl Payload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct PackageVersionAnsPayload {
pub package_identifier: u8,
pub package_version: u8,
@ -130,7 +130,7 @@ impl PayloadCodec for PackageVersionAnsPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeReqPayload {
pub device_time: u32,
pub param: AppTimeReqPayloadParam,
@ -172,13 +172,13 @@ impl PayloadCodec for AppTimeReqPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeReqPayloadParam {
pub token_req: u8,
pub ans_required: bool,
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeAnsPayload {
pub time_correction: i32,
pub param: AppTimeAnsPayloadParam,
@ -215,12 +215,12 @@ impl PayloadCodec for AppTimeAnsPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeAnsPayloadParam {
pub token_ans: u8,
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityReqPayload {
pub period: u8,
}
@ -245,7 +245,7 @@ impl PayloadCodec for DeviceAppTimePeriodicityReqPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityAnsPayload {
pub status: DeviceAppTimePeriodicityAnsPayloadStatus,
pub time: u32,
@ -280,12 +280,12 @@ impl PayloadCodec for DeviceAppTimePeriodicityAnsPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityAnsPayloadStatus {
pub not_supported: bool,
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct ForceDeviceResyncReqPayload {
pub force_conf: ForceDeviceResyncReqPayloadForceConf,
}
@ -312,7 +312,7 @@ impl PayloadCodec for ForceDeviceResyncReqPayload {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct ForceDeviceResyncReqPayloadForceConf {
pub nb_transmissions: u8,
}