Work-in-progress test.

This is work-in-progress and only contains a partial implementation.
Downlink (other than OTAA) is not yet implemented. Therefore you should
disable ADR in the region_.toml config when testing.
This commit is contained in:
Orne Brocaar 2024-02-07 15:06:11 +00:00
parent d599e7a276
commit 5c3624cfbe
9 changed files with 264 additions and 31 deletions

View File

@ -0,0 +1,6 @@
drop index idx_device_dev_addr;
drop index idx_device_secondary_dev_addr;
alter table device
drop column secondary_dev_addr,
drop column device_session;

View File

@ -0,0 +1,6 @@
alter table device
add column secondary_dev_addr bytea,
add column device_session bytea;
create index idx_device_dev_addr on device (dev_addr);
create index idx_device_secondary_dev_addr on device (secondary_dev_addr);

View File

@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::io::Cursor;
use std::str::FromStr;
use anyhow::{Context, Result};
@ -7,15 +8,24 @@ use bigdecimal::BigDecimal;
use chrono::{DateTime, Duration, Utc};
use diesel::{backend::Backend, deserialize, dsl, prelude::*, serialize, sql_types::Text};
use diesel_async::RunQueryDsl;
use prost::Message;
use tracing::info;
use uuid::Uuid;
use chirpstack_api::internal;
use lrwn::{DevAddr, EUI64};
use super::schema::{application, device, device_profile, multicast_group_device, tenant};
use super::{error::Error, fields, get_async_db_conn};
use crate::api::helpers::FromProto;
use crate::config;
pub enum ValidationStatus {
Ok(u32, internal::DeviceSession),
Retransmission(u32, internal::DeviceSession),
Reset(u32, internal::DeviceSession),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, AsExpression, FromSqlRow)]
#[diesel(sql_type = Text)]
pub enum DeviceClass {
@ -95,6 +105,20 @@ pub struct Device {
pub tags: fields::KeyValue,
pub variables: fields::KeyValue,
pub join_eui: EUI64,
pub secondary_dev_addr: Option<DevAddr>,
pub device_session: Option<Vec<u8>>,
}
#[derive(AsChangeset, Debug, Clone, Default)]
#[diesel(table_name = device)]
pub struct DeviceChangeset {
pub last_seen_at: Option<Option<DateTime<Utc>>>,
pub dr: Option<Option<i16>>,
pub dev_addr: Option<Option<DevAddr>>,
pub enabled_class: Option<DeviceClass>,
pub join_eui: Option<EUI64>,
pub secondary_dev_addr: Option<Option<DevAddr>>,
pub device_session: Option<Option<Vec<u8>>>,
}
impl Device {
@ -134,6 +158,8 @@ impl Default for Device {
tags: fields::KeyValue::new(HashMap::new()),
variables: fields::KeyValue::new(HashMap::new()),
join_eui: EUI64::default(),
secondary_dev_addr: None,
device_session: None,
}
}
}
@ -237,6 +263,143 @@ pub async fn get(dev_eui: &EUI64) -> Result<Device, Error> {
Ok(d)
}
// Return the device-session matching the given PhyPayload. This will fetch all device-session
// associated with the used DevAddr and based on f_cont and mic, decides which one to use.
// This function will increment the uplink frame-counter and will immediately update the
// device-session in the database, to make sure that in case this function is called multiple
// times, at most one will be valid.
// On Ok response, the PhyPayload f_cnt will be set to the full 32bit frame-counter based on the
// device-session context.
pub async fn get_for_phypayload_and_incr_f_cnt_up(
relayed: bool,
phy: &mut lrwn::PhyPayload,
tx_dr: u8,
tx_ch: u8,
) -> Result<ValidationStatus, Error> {
let mut dev_addr = lrwn::DevAddr::from_be_bytes([0x00, 0x00, 0x00, 0x00]);
let mut f_cnt_orig = 0;
// Get the dev_addr and original f_cnt.
if let lrwn::Payload::MACPayload(pl) = &phy.payload {
dev_addr = pl.fhdr.devaddr;
f_cnt_orig = pl.fhdr.f_cnt;
} else {
return Err(Error::InvalidPayload("MacPayload".to_string()));
}
let mut c = get_async_db_conn().await?;
c.build_transaction()
.run::<ValidationStatus, Error, _>(|c| {
Box::pin(async move {
let devices: Vec<(EUI64, Option<Vec<u8>>)> = device::dsl::device
.select((device::dev_eui, device::device_session))
.filter(
device::dsl::dev_addr
.eq(&dev_addr)
.or(device::dsl::secondary_dev_addr.eq(&dev_addr)),
)
.for_update()
.load(c)
.await?;
for d in &devices {
if d.1.is_none() {
continue;
}
let mut ds =
internal::DeviceSession::decode(&mut Cursor::new(d.1.as_ref().unwrap()))?;
// Get the full 32bit frame-counter.
let full_f_cnt = get_full_f_cnt_up(ds.f_cnt_up, f_cnt_orig);
let f_nwk_s_int_key = lrwn::AES128Key::from_slice(&ds.f_nwk_s_int_key)?;
let s_nwk_s_int_key = lrwn::AES128Key::from_slice(&ds.s_nwk_s_int_key)?;
// Check both the full frame-counter and the received frame-counter
// truncated to the 16LSB.
// The latter is needed in case of a frame-counter reset as the
// GetFullFCntUp will think the 16LSB has rolled over and will
// increment the 16MSB bit.
let mut mic_ok = false;
for f_cnt in [full_f_cnt, f_cnt_orig] {
// Set the full f_cnt.
if let lrwn::Payload::MACPayload(pl) = &mut phy.payload {
pl.fhdr.f_cnt = f_cnt;
}
mic_ok = phy
.validate_uplink_data_mic(
ds.mac_version().from_proto(),
ds.conf_f_cnt,
tx_dr,
tx_ch,
&f_nwk_s_int_key,
&s_nwk_s_int_key,
)
.context("Validate MIC")?;
if mic_ok {
break;
}
}
if mic_ok {
let full_f_cnt = if let lrwn::Payload::MACPayload(pl) = &phy.payload {
pl.fhdr.f_cnt
} else {
0
};
if let Some(relay) = &ds.relay {
if !relayed && relay.ed_relay_only {
info!(
dev_eui = %d.0,
"Only communication through relay is allowed"
);
return Err(Error::NotFound(dev_addr.to_string()));
}
}
if full_f_cnt >= ds.f_cnt_up {
// We immediately save the device-session to make sure that concurrent calls for
// the same uplink will fail on the frame-counter validation.
let ds_f_cnt_up = ds.f_cnt_up;
ds.f_cnt_up = full_f_cnt + 1;
let _ = diesel::update(device::dsl::device.find(&d.0))
.set(device::device_session.eq(&ds.encode_to_vec()))
.execute(c)
.await?;
// We do return the device-session with original frame-counter
ds.f_cnt_up = ds_f_cnt_up;
return Ok(ValidationStatus::Ok(full_f_cnt, ds));
} else if ds.skip_f_cnt_check {
// re-transmission or frame-counter reset
ds.f_cnt_up = 0;
return Ok(ValidationStatus::Ok(full_f_cnt, ds));
} else if full_f_cnt == (ds.f_cnt_up - 1) {
// re-transmission, the frame-counter did not increment
return Ok(ValidationStatus::Retransmission(full_f_cnt, ds));
} else {
return Ok(ValidationStatus::Reset(full_f_cnt, ds));
}
}
// Restore the original f_cnt.
if let lrwn::Payload::MACPayload(pl) = &mut phy.payload {
pl.fhdr.f_cnt = f_cnt_orig;
}
}
Err(Error::InvalidMIC)
})
})
.await
}
pub async fn update(d: Device) -> Result<Device, Error> {
d.validate()?;
@ -260,6 +423,17 @@ pub async fn update(d: Device) -> Result<Device, Error> {
Ok(d)
}
pub async fn partial_update(dev_eui: EUI64, d: &DeviceChangeset) -> Result<Device, Error> {
let d = diesel::update(device::dsl::device.find(&dev_eui))
.set(d)
.get_result::<Device>(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, "Device partially updated");
Ok(d)
}
pub async fn set_enabled_class(dev_eui: &EUI64, mode: DeviceClass) -> Result<Device, Error> {
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set(device::enabled_class.eq(&mode))
@ -530,6 +704,29 @@ pub async fn get_with_class_b_c_queue_items(limit: usize) -> Result<Vec<Device>>
.context("Get with Class B/C queue-items transaction")
}
// GetFullFCntUp returns the full 32bit frame-counter, given the fCntUp which
// has been truncated to the last 16 LSB.
// Notes:
// * After a succesful validation of the FCntUp and the MIC, don't forget
// to synchronize the device FCntUp with the packet FCnt.
// * In case of a frame-counter rollover, the returned values will be less
// than the given DeviceSession FCntUp. This must be validated outside this
// function!
// * In case of a re-transmission, the returned frame-counter equals
// DeviceSession.FCntUp - 1, as the FCntUp value holds the next expected
// frame-counter, not the FCntUp which was last seen.
fn get_full_f_cnt_up(next_expected_full_fcnt: u32, truncated_f_cnt: u32) -> u32 {
// Handle re-transmission.
if truncated_f_cnt == (((next_expected_full_fcnt % (1 << 16)) as u16).wrapping_sub(1)) as u32 {
return next_expected_full_fcnt - 1;
}
let gap = ((truncated_f_cnt as u16).wrapping_sub((next_expected_full_fcnt % (1 << 16)) as u16))
as u32;
next_expected_full_fcnt.wrapping_add(gap)
}
#[cfg(test)]
pub mod test {
use super::*;

View File

@ -63,6 +63,8 @@ diesel::table! {
tags -> Jsonb,
variables -> Jsonb,
join_eui -> Bytea,
secondary_dev_addr -> Nullable<Bytea>,
device_session -> Nullable<Bytea>,
}
}

View File

@ -240,7 +240,11 @@ pub fn device_session(dev_eui: EUI64, ds: internal::DeviceSession) -> Validator
Box::new(move || {
let ds = ds.clone();
Box::pin(async move {
let ds_get = device_session::get(&dev_eui).await.unwrap();
let d = device::get(&dev_eui).await.unwrap();
let ds_get = internal::DeviceSession::decode(&mut Cursor::new(
d.device_session.as_ref().unwrap(),
))
.unwrap();
assert_eq!(ds, ds_get);
})
})
@ -249,8 +253,8 @@ pub fn device_session(dev_eui: EUI64, ds: internal::DeviceSession) -> Validator
pub fn no_device_session(dev_eui: EUI64) -> Validator {
Box::new(move || {
Box::pin(async move {
let res = device_session::get(&dev_eui).await;
assert_eq!(true, res.is_err());
let d = device::get(&dev_eui).await.unwrap();
assert!(d.device_session.is_none());
})
})
}

View File

@ -8,7 +8,7 @@ use super::assert;
use crate::storage::{
application,
device::{self, DeviceClass},
device_keys, device_profile, gateway, reset_redis, tenant,
device_keys, device_profile, gateway, reset_db, reset_redis, tenant,
};
use crate::{config, gateway::backend as gateway_backend, integration, region, test, uplink};
use chirpstack_api::{common, gw, internal, stream};
@ -1224,7 +1224,7 @@ async fn test_lorawan_11() {
async fn run_test(t: &Test) {
println!("> {}", t.name);
reset_redis().await.unwrap();
reset_db().await.unwrap();
let mut conf: config::Configuration = (*config::get()).clone();
for f in &t.extra_uplink_channels {

View File

@ -3,6 +3,7 @@ use std::str::FromStr;
use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Local, Utc};
use prost::Message;
use tracing::{debug, error, info, span, trace, warn, Instrument, Level};
use super::error::Error;
@ -50,6 +51,7 @@ pub struct Data {
must_send_downlink: bool,
downlink_mac_commands: Vec<lrwn::MACCommandSet>,
device_gateway_rx_info: Option<internal::DeviceGatewayRxInfo>,
device_changeset: device::DeviceChangeset,
}
impl Data {
@ -109,6 +111,7 @@ impl Data {
must_send_downlink: false,
downlink_mac_commands: Vec::new(),
device_gateway_rx_info: None,
device_changeset: Default::default(),
};
ctx.handle_passive_roaming_device().await?;
@ -149,6 +152,7 @@ impl Data {
ctx.sync_uplink_f_cnt()?;
ctx.set_region_config_id()?;
ctx.save_device_session().await?;
ctx.update_device().await?;
ctx.handle_uplink_ack().await?;
ctx.save_metrics().await?;
@ -184,6 +188,7 @@ impl Data {
uplink_event: None,
must_send_downlink: false,
downlink_mac_commands: Vec::new(),
device_changeset: Default::default(),
};
ctx.get_device_session_relayed().await?;
@ -239,7 +244,7 @@ impl Data {
return Err(Error::AnyhowError(anyhow!("No MacPayload in PhyPayload")));
};
match device_session::get_for_phypayload_and_incr_f_cnt_up(
match device::get_for_phypayload_and_incr_f_cnt_up(
false,
&mut self.phy_payload,
self.uplink_frame_set.dr,
@ -248,16 +253,16 @@ impl Data {
.await
{
Ok(v) => match v {
device_session::ValidationStatus::Ok(f_cnt, ds) => {
device::ValidationStatus::Ok(f_cnt, ds) => {
self.device_session = Some(ds);
self.f_cnt_up_full = f_cnt;
}
device_session::ValidationStatus::Retransmission(f_cnt, ds) => {
device::ValidationStatus::Retransmission(f_cnt, ds) => {
self.retransmission = true;
self.device_session = Some(ds);
self.f_cnt_up_full = f_cnt;
}
device_session::ValidationStatus::Reset(f_cnt, ds) => {
device::ValidationStatus::Reset(f_cnt, ds) => {
self.reset = true;
self.device_session = Some(ds);
self.f_cnt_up_full = f_cnt;
@ -1107,11 +1112,17 @@ impl Data {
Ok(())
}
async fn save_device_session(&self) -> Result<()> {
trace!("Saving device-session");
device_session::save(self.device_session.as_ref().unwrap())
.await
.context("Save device-session")?;
async fn save_device_session(&mut self) -> Result<()> {
trace!("Setting device-session");
let ds = self.device_session.as_ref().unwrap();
self.device_changeset.device_session = Some(Some(ds.encode_to_vec()));
Ok(())
}
async fn update_device(&mut self) -> Result<()> {
trace!("Updating device");
let d = self.device.as_mut().unwrap();
*d = device::partial_update(d.dev_eui, &self.device_changeset).await?;
Ok(())
}

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use chrono::{DateTime, Local, Utc};
use prost::Message;
use tracing::{error, info, span, trace, warn, Instrument, Level};
use lrwn::{
@ -20,7 +21,6 @@ use super::{
use crate::api::{backend::get_async_receiver, helpers::ToProto};
use crate::backend::{joinserver, keywrap, roaming};
use crate::helpers::errors::PrintFullError;
use crate::storage::device_session;
use crate::storage::{
application,
device::{self, DeviceClass},
@ -53,6 +53,7 @@ pub struct JoinRequest {
nwk_s_enc_key: Option<AES128Key>,
app_s_key: Option<common::KeyEnvelope>,
js_session_key_id: Vec<u8>,
device_changeset: device::DeviceChangeset,
}
impl JoinRequest {
@ -110,6 +111,7 @@ impl JoinRequest {
nwk_s_enc_key: None,
app_s_key: None,
js_session_key_id: vec![],
device_changeset: Default::default(),
};
ctx.get_join_request_payload()?;
@ -141,11 +143,12 @@ impl JoinRequest {
ctx.construct_join_accept_and_set_keys()?;
}
ctx.log_uplink_meta().await?;
ctx.create_device_session().await?;
ctx.set_device_session().await?;
ctx.flush_device_queue().await?;
ctx.set_device_mode().await?;
ctx.set_dev_addr().await?;
ctx.set_join_eui().await?;
ctx.update_device().await?;
ctx.start_downlink_join_accept_flow().await?;
ctx.send_join_event().await?;
@ -173,6 +176,7 @@ impl JoinRequest {
nwk_s_enc_key: None,
app_s_key: None,
js_session_key_id: vec![],
device_changeset: Default::default(),
};
ctx.get_join_request_payload_relayed()?;
@ -193,11 +197,12 @@ impl JoinRequest {
ctx.validate_dev_nonce_and_get_device_keys().await?;
ctx.construct_join_accept_and_set_keys()?;
}
ctx.create_device_session().await?;
ctx.set_device_session().await?;
ctx.flush_device_queue().await?;
ctx.set_device_mode().await?;
ctx.set_dev_addr().await?;
ctx.set_join_eui().await?;
ctx.update_device().await?;
ctx.start_downlink_join_accept_flow_relayed().await?;
ctx.send_join_event().await?;
@ -768,8 +773,8 @@ impl JoinRequest {
Ok(())
}
async fn create_device_session(&mut self) -> Result<()> {
trace!("Creating device-session");
async fn set_device_session(&mut self) -> Result<()> {
trace!("Setting device-session");
let region_conf = region::get(&self.uplink_frame_set.region_config_id)?;
let region_network = config::get_region_network(&self.uplink_frame_set.region_config_id)?;
@ -847,10 +852,7 @@ impl JoinRequest {
None => {}
}
device_session::save(&ds)
.await
.context("Saving device-session failed")?;
self.device_changeset.device_session = Some(Some(ds.encode_to_vec()));
self.device_session = Some(ds);
Ok(())
@ -870,31 +872,35 @@ impl JoinRequest {
async fn set_device_mode(&mut self) -> Result<()> {
let dp = self.device_profile.as_ref().unwrap();
let device = self.device.as_mut().unwrap();
// LoRaWAN 1.1 devices send a mac-command when changing to Class-C.
if dp.supports_class_c && dp.mac_version.to_string().starts_with("1.0") {
*device = device::set_enabled_class(&device.dev_eui, DeviceClass::C).await?;
self.device_changeset.enabled_class = Some(DeviceClass::C);
} else {
*device = device::set_enabled_class(&device.dev_eui, DeviceClass::A).await?;
self.device_changeset.enabled_class = Some(DeviceClass::A);
}
Ok(())
}
async fn set_dev_addr(&mut self) -> Result<()> {
trace!("Setting DevAddr");
let dev = self.device.as_mut().unwrap();
*dev = device::set_dev_addr(dev.dev_eui, self.dev_addr.unwrap()).await?;
self.device_changeset.dev_addr = Some(Some(self.dev_addr.unwrap()));
self.device_changeset.secondary_dev_addr = Some(None);
Ok(())
}
async fn set_join_eui(&mut self) -> Result<()> {
trace!("Setting JoinEUI");
let dev = self.device.as_mut().unwrap();
let req = self.join_request.as_ref().unwrap();
self.device_changeset.join_eui = Some(req.join_eui);
Ok(())
}
*dev = device::set_join_eui(dev.dev_eui, req.join_eui).await?;
async fn update_device(&mut self) -> Result<()> {
trace!("Updating device");
let d = self.device.as_mut().unwrap();
*d = device::partial_update(d.dev_eui, &self.device_changeset).await?;
Ok(())
}

View File

@ -11,6 +11,7 @@ pkgs.mkShell {
pkgs.perl
pkgs.cmake
pkgs.clang
pkgs.postgresql # needed to build the diesel cli utility
];
LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib";
BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.llvmPackages.libclang.lib}/lib/clang/${pkgs.llvmPackages.libclang.version}/include";