From 5c3624cfbe38f832ed0528e67fcfcda441256751 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 7 Feb 2024 15:06:11 +0000 Subject: [PATCH] Work-in-progress test. This is work-in-progress and only contains a partial implementation. Downlink (other than OTAA) is not yet implemented. Therefore you should disable ADR in the region_.toml config when testing. --- .../down.sql | 6 + .../up.sql | 6 + chirpstack/src/storage/device.rs | 197 ++++++++++++++++++ chirpstack/src/storage/schema.rs | 2 + chirpstack/src/test/assert.rs | 10 +- chirpstack/src/test/otaa_test.rs | 4 +- chirpstack/src/uplink/data.rs | 29 ++- chirpstack/src/uplink/join.rs | 40 ++-- shell.nix | 1 + 9 files changed, 264 insertions(+), 31 deletions(-) create mode 100644 chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/down.sql create mode 100644 chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/up.sql diff --git a/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/down.sql b/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/down.sql new file mode 100644 index 00000000..baf833aa --- /dev/null +++ b/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/down.sql @@ -0,0 +1,6 @@ +drop index idx_device_dev_addr; +drop index idx_device_secondary_dev_addr; + +alter table device + drop column secondary_dev_addr, + drop column device_session; diff --git a/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/up.sql b/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/up.sql new file mode 100644 index 00000000..d3f381ca --- /dev/null +++ b/chirpstack/migrations/2024-02-07-083424_add_device_session_to_device/up.sql @@ -0,0 +1,6 @@ +alter table device + add column secondary_dev_addr bytea, + add column device_session bytea; + +create index idx_device_dev_addr on device (dev_addr); +create index idx_device_secondary_dev_addr on device (secondary_dev_addr); diff --git a/chirpstack/src/storage/device.rs b/chirpstack/src/storage/device.rs index 7a380efb..4b3c8fa7 100644 --- a/chirpstack/src/storage/device.rs +++ b/chirpstack/src/storage/device.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fmt; +use std::io::Cursor; use std::str::FromStr; use anyhow::{Context, Result}; @@ -7,15 +8,24 @@ use bigdecimal::BigDecimal; use chrono::{DateTime, Duration, Utc}; use diesel::{backend::Backend, deserialize, dsl, prelude::*, serialize, sql_types::Text}; use diesel_async::RunQueryDsl; +use prost::Message; use tracing::info; use uuid::Uuid; +use chirpstack_api::internal; use lrwn::{DevAddr, EUI64}; use super::schema::{application, device, device_profile, multicast_group_device, tenant}; use super::{error::Error, fields, get_async_db_conn}; +use crate::api::helpers::FromProto; use crate::config; +pub enum ValidationStatus { + Ok(u32, internal::DeviceSession), + Retransmission(u32, internal::DeviceSession), + Reset(u32, internal::DeviceSession), +} + #[derive(Debug, Clone, Copy, Eq, PartialEq, AsExpression, FromSqlRow)] #[diesel(sql_type = Text)] pub enum DeviceClass { @@ -95,6 +105,20 @@ pub struct Device { pub tags: fields::KeyValue, pub variables: fields::KeyValue, pub join_eui: EUI64, + pub secondary_dev_addr: Option, + pub device_session: Option>, +} + +#[derive(AsChangeset, Debug, Clone, Default)] +#[diesel(table_name = device)] +pub struct DeviceChangeset { + pub last_seen_at: Option>>, + pub dr: Option>, + pub dev_addr: Option>, + pub enabled_class: Option, + pub join_eui: Option, + pub secondary_dev_addr: Option>, + pub device_session: Option>>, } impl Device { @@ -134,6 +158,8 @@ impl Default for Device { tags: fields::KeyValue::new(HashMap::new()), variables: fields::KeyValue::new(HashMap::new()), join_eui: EUI64::default(), + secondary_dev_addr: None, + device_session: None, } } } @@ -237,6 +263,143 @@ pub async fn get(dev_eui: &EUI64) -> Result { Ok(d) } +// Return the device-session matching the given PhyPayload. This will fetch all device-session +// associated with the used DevAddr and based on f_cont and mic, decides which one to use. +// This function will increment the uplink frame-counter and will immediately update the +// device-session in the database, to make sure that in case this function is called multiple +// times, at most one will be valid. +// On Ok response, the PhyPayload f_cnt will be set to the full 32bit frame-counter based on the +// device-session context. +pub async fn get_for_phypayload_and_incr_f_cnt_up( + relayed: bool, + phy: &mut lrwn::PhyPayload, + tx_dr: u8, + tx_ch: u8, +) -> Result { + let mut dev_addr = lrwn::DevAddr::from_be_bytes([0x00, 0x00, 0x00, 0x00]); + let mut f_cnt_orig = 0; + + // Get the dev_addr and original f_cnt. + if let lrwn::Payload::MACPayload(pl) = &phy.payload { + dev_addr = pl.fhdr.devaddr; + f_cnt_orig = pl.fhdr.f_cnt; + } else { + return Err(Error::InvalidPayload("MacPayload".to_string())); + } + + let mut c = get_async_db_conn().await?; + + c.build_transaction() + .run::(|c| { + Box::pin(async move { + let devices: Vec<(EUI64, Option>)> = device::dsl::device + .select((device::dev_eui, device::device_session)) + .filter( + device::dsl::dev_addr + .eq(&dev_addr) + .or(device::dsl::secondary_dev_addr.eq(&dev_addr)), + ) + .for_update() + .load(c) + .await?; + + for d in &devices { + if d.1.is_none() { + continue; + } + + let mut ds = + internal::DeviceSession::decode(&mut Cursor::new(d.1.as_ref().unwrap()))?; + + // Get the full 32bit frame-counter. + let full_f_cnt = get_full_f_cnt_up(ds.f_cnt_up, f_cnt_orig); + let f_nwk_s_int_key = lrwn::AES128Key::from_slice(&ds.f_nwk_s_int_key)?; + let s_nwk_s_int_key = lrwn::AES128Key::from_slice(&ds.s_nwk_s_int_key)?; + + // Check both the full frame-counter and the received frame-counter + // truncated to the 16LSB. + // The latter is needed in case of a frame-counter reset as the + // GetFullFCntUp will think the 16LSB has rolled over and will + // increment the 16MSB bit. + let mut mic_ok = false; + for f_cnt in [full_f_cnt, f_cnt_orig] { + // Set the full f_cnt. + if let lrwn::Payload::MACPayload(pl) = &mut phy.payload { + pl.fhdr.f_cnt = f_cnt; + } + + mic_ok = phy + .validate_uplink_data_mic( + ds.mac_version().from_proto(), + ds.conf_f_cnt, + tx_dr, + tx_ch, + &f_nwk_s_int_key, + &s_nwk_s_int_key, + ) + .context("Validate MIC")?; + + if mic_ok { + break; + } + } + + if mic_ok { + let full_f_cnt = if let lrwn::Payload::MACPayload(pl) = &phy.payload { + pl.fhdr.f_cnt + } else { + 0 + }; + + if let Some(relay) = &ds.relay { + if !relayed && relay.ed_relay_only { + info!( + dev_eui = %d.0, + "Only communication through relay is allowed" + ); + return Err(Error::NotFound(dev_addr.to_string())); + } + } + + if full_f_cnt >= ds.f_cnt_up { + // We immediately save the device-session to make sure that concurrent calls for + // the same uplink will fail on the frame-counter validation. + let ds_f_cnt_up = ds.f_cnt_up; + ds.f_cnt_up = full_f_cnt + 1; + + let _ = diesel::update(device::dsl::device.find(&d.0)) + .set(device::device_session.eq(&ds.encode_to_vec())) + .execute(c) + .await?; + + // We do return the device-session with original frame-counter + ds.f_cnt_up = ds_f_cnt_up; + + return Ok(ValidationStatus::Ok(full_f_cnt, ds)); + } else if ds.skip_f_cnt_check { + // re-transmission or frame-counter reset + ds.f_cnt_up = 0; + return Ok(ValidationStatus::Ok(full_f_cnt, ds)); + } else if full_f_cnt == (ds.f_cnt_up - 1) { + // re-transmission, the frame-counter did not increment + return Ok(ValidationStatus::Retransmission(full_f_cnt, ds)); + } else { + return Ok(ValidationStatus::Reset(full_f_cnt, ds)); + } + } + + // Restore the original f_cnt. + if let lrwn::Payload::MACPayload(pl) = &mut phy.payload { + pl.fhdr.f_cnt = f_cnt_orig; + } + } + + Err(Error::InvalidMIC) + }) + }) + .await +} + pub async fn update(d: Device) -> Result { d.validate()?; @@ -260,6 +423,17 @@ pub async fn update(d: Device) -> Result { Ok(d) } +pub async fn partial_update(dev_eui: EUI64, d: &DeviceChangeset) -> Result { + let d = diesel::update(device::dsl::device.find(&dev_eui)) + .set(d) + .get_result::(&mut get_async_db_conn().await?) + .await + .map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?; + + info!(dev_eui = %dev_eui, "Device partially updated"); + Ok(d) +} + pub async fn set_enabled_class(dev_eui: &EUI64, mode: DeviceClass) -> Result { let d: Device = diesel::update(device::dsl::device.find(&dev_eui)) .set(device::enabled_class.eq(&mode)) @@ -530,6 +704,29 @@ pub async fn get_with_class_b_c_queue_items(limit: usize) -> Result> .context("Get with Class B/C queue-items transaction") } +// GetFullFCntUp returns the full 32bit frame-counter, given the fCntUp which +// has been truncated to the last 16 LSB. +// Notes: +// * After a succesful validation of the FCntUp and the MIC, don't forget +// to synchronize the device FCntUp with the packet FCnt. +// * In case of a frame-counter rollover, the returned values will be less +// than the given DeviceSession FCntUp. This must be validated outside this +// function! +// * In case of a re-transmission, the returned frame-counter equals +// DeviceSession.FCntUp - 1, as the FCntUp value holds the next expected +// frame-counter, not the FCntUp which was last seen. +fn get_full_f_cnt_up(next_expected_full_fcnt: u32, truncated_f_cnt: u32) -> u32 { + // Handle re-transmission. + if truncated_f_cnt == (((next_expected_full_fcnt % (1 << 16)) as u16).wrapping_sub(1)) as u32 { + return next_expected_full_fcnt - 1; + } + + let gap = ((truncated_f_cnt as u16).wrapping_sub((next_expected_full_fcnt % (1 << 16)) as u16)) + as u32; + + next_expected_full_fcnt.wrapping_add(gap) +} + #[cfg(test)] pub mod test { use super::*; diff --git a/chirpstack/src/storage/schema.rs b/chirpstack/src/storage/schema.rs index 30966b7e..b8ef82d8 100644 --- a/chirpstack/src/storage/schema.rs +++ b/chirpstack/src/storage/schema.rs @@ -63,6 +63,8 @@ diesel::table! { tags -> Jsonb, variables -> Jsonb, join_eui -> Bytea, + secondary_dev_addr -> Nullable, + device_session -> Nullable, } } diff --git a/chirpstack/src/test/assert.rs b/chirpstack/src/test/assert.rs index 06cb792b..c0c5aeb9 100644 --- a/chirpstack/src/test/assert.rs +++ b/chirpstack/src/test/assert.rs @@ -240,7 +240,11 @@ pub fn device_session(dev_eui: EUI64, ds: internal::DeviceSession) -> Validator Box::new(move || { let ds = ds.clone(); Box::pin(async move { - let ds_get = device_session::get(&dev_eui).await.unwrap(); + let d = device::get(&dev_eui).await.unwrap(); + let ds_get = internal::DeviceSession::decode(&mut Cursor::new( + d.device_session.as_ref().unwrap(), + )) + .unwrap(); assert_eq!(ds, ds_get); }) }) @@ -249,8 +253,8 @@ pub fn device_session(dev_eui: EUI64, ds: internal::DeviceSession) -> Validator pub fn no_device_session(dev_eui: EUI64) -> Validator { Box::new(move || { Box::pin(async move { - let res = device_session::get(&dev_eui).await; - assert_eq!(true, res.is_err()); + let d = device::get(&dev_eui).await.unwrap(); + assert!(d.device_session.is_none()); }) }) } diff --git a/chirpstack/src/test/otaa_test.rs b/chirpstack/src/test/otaa_test.rs index 55f2d5fd..6760b0dc 100644 --- a/chirpstack/src/test/otaa_test.rs +++ b/chirpstack/src/test/otaa_test.rs @@ -8,7 +8,7 @@ use super::assert; use crate::storage::{ application, device::{self, DeviceClass}, - device_keys, device_profile, gateway, reset_redis, tenant, + device_keys, device_profile, gateway, reset_db, reset_redis, tenant, }; use crate::{config, gateway::backend as gateway_backend, integration, region, test, uplink}; use chirpstack_api::{common, gw, internal, stream}; @@ -1224,7 +1224,7 @@ async fn test_lorawan_11() { async fn run_test(t: &Test) { println!("> {}", t.name); - reset_redis().await.unwrap(); + reset_db().await.unwrap(); let mut conf: config::Configuration = (*config::get()).clone(); for f in &t.extra_uplink_channels { diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index 0986b217..2c1b5081 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use anyhow::{Context, Result}; use chrono::{DateTime, Duration, Local, Utc}; +use prost::Message; use tracing::{debug, error, info, span, trace, warn, Instrument, Level}; use super::error::Error; @@ -50,6 +51,7 @@ pub struct Data { must_send_downlink: bool, downlink_mac_commands: Vec, device_gateway_rx_info: Option, + device_changeset: device::DeviceChangeset, } impl Data { @@ -109,6 +111,7 @@ impl Data { must_send_downlink: false, downlink_mac_commands: Vec::new(), device_gateway_rx_info: None, + device_changeset: Default::default(), }; ctx.handle_passive_roaming_device().await?; @@ -149,6 +152,7 @@ impl Data { ctx.sync_uplink_f_cnt()?; ctx.set_region_config_id()?; ctx.save_device_session().await?; + ctx.update_device().await?; ctx.handle_uplink_ack().await?; ctx.save_metrics().await?; @@ -184,6 +188,7 @@ impl Data { uplink_event: None, must_send_downlink: false, downlink_mac_commands: Vec::new(), + device_changeset: Default::default(), }; ctx.get_device_session_relayed().await?; @@ -239,7 +244,7 @@ impl Data { return Err(Error::AnyhowError(anyhow!("No MacPayload in PhyPayload"))); }; - match device_session::get_for_phypayload_and_incr_f_cnt_up( + match device::get_for_phypayload_and_incr_f_cnt_up( false, &mut self.phy_payload, self.uplink_frame_set.dr, @@ -248,16 +253,16 @@ impl Data { .await { Ok(v) => match v { - device_session::ValidationStatus::Ok(f_cnt, ds) => { + device::ValidationStatus::Ok(f_cnt, ds) => { self.device_session = Some(ds); self.f_cnt_up_full = f_cnt; } - device_session::ValidationStatus::Retransmission(f_cnt, ds) => { + device::ValidationStatus::Retransmission(f_cnt, ds) => { self.retransmission = true; self.device_session = Some(ds); self.f_cnt_up_full = f_cnt; } - device_session::ValidationStatus::Reset(f_cnt, ds) => { + device::ValidationStatus::Reset(f_cnt, ds) => { self.reset = true; self.device_session = Some(ds); self.f_cnt_up_full = f_cnt; @@ -1107,11 +1112,17 @@ impl Data { Ok(()) } - async fn save_device_session(&self) -> Result<()> { - trace!("Saving device-session"); - device_session::save(self.device_session.as_ref().unwrap()) - .await - .context("Save device-session")?; + async fn save_device_session(&mut self) -> Result<()> { + trace!("Setting device-session"); + let ds = self.device_session.as_ref().unwrap(); + self.device_changeset.device_session = Some(Some(ds.encode_to_vec())); + Ok(()) + } + + async fn update_device(&mut self) -> Result<()> { + trace!("Updating device"); + let d = self.device.as_mut().unwrap(); + *d = device::partial_update(d.dev_eui, &self.device_changeset).await?; Ok(()) } diff --git a/chirpstack/src/uplink/join.rs b/chirpstack/src/uplink/join.rs index a34a9a18..1df581ea 100644 --- a/chirpstack/src/uplink/join.rs +++ b/chirpstack/src/uplink/join.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use chrono::{DateTime, Local, Utc}; +use prost::Message; use tracing::{error, info, span, trace, warn, Instrument, Level}; use lrwn::{ @@ -20,7 +21,6 @@ use super::{ use crate::api::{backend::get_async_receiver, helpers::ToProto}; use crate::backend::{joinserver, keywrap, roaming}; use crate::helpers::errors::PrintFullError; -use crate::storage::device_session; use crate::storage::{ application, device::{self, DeviceClass}, @@ -53,6 +53,7 @@ pub struct JoinRequest { nwk_s_enc_key: Option, app_s_key: Option, js_session_key_id: Vec, + device_changeset: device::DeviceChangeset, } impl JoinRequest { @@ -110,6 +111,7 @@ impl JoinRequest { nwk_s_enc_key: None, app_s_key: None, js_session_key_id: vec![], + device_changeset: Default::default(), }; ctx.get_join_request_payload()?; @@ -141,11 +143,12 @@ impl JoinRequest { ctx.construct_join_accept_and_set_keys()?; } ctx.log_uplink_meta().await?; - ctx.create_device_session().await?; + ctx.set_device_session().await?; ctx.flush_device_queue().await?; ctx.set_device_mode().await?; ctx.set_dev_addr().await?; ctx.set_join_eui().await?; + ctx.update_device().await?; ctx.start_downlink_join_accept_flow().await?; ctx.send_join_event().await?; @@ -173,6 +176,7 @@ impl JoinRequest { nwk_s_enc_key: None, app_s_key: None, js_session_key_id: vec![], + device_changeset: Default::default(), }; ctx.get_join_request_payload_relayed()?; @@ -193,11 +197,12 @@ impl JoinRequest { ctx.validate_dev_nonce_and_get_device_keys().await?; ctx.construct_join_accept_and_set_keys()?; } - ctx.create_device_session().await?; + ctx.set_device_session().await?; ctx.flush_device_queue().await?; ctx.set_device_mode().await?; ctx.set_dev_addr().await?; ctx.set_join_eui().await?; + ctx.update_device().await?; ctx.start_downlink_join_accept_flow_relayed().await?; ctx.send_join_event().await?; @@ -768,8 +773,8 @@ impl JoinRequest { Ok(()) } - async fn create_device_session(&mut self) -> Result<()> { - trace!("Creating device-session"); + async fn set_device_session(&mut self) -> Result<()> { + trace!("Setting device-session"); let region_conf = region::get(&self.uplink_frame_set.region_config_id)?; let region_network = config::get_region_network(&self.uplink_frame_set.region_config_id)?; @@ -847,10 +852,7 @@ impl JoinRequest { None => {} } - device_session::save(&ds) - .await - .context("Saving device-session failed")?; - + self.device_changeset.device_session = Some(Some(ds.encode_to_vec())); self.device_session = Some(ds); Ok(()) @@ -870,31 +872,35 @@ impl JoinRequest { async fn set_device_mode(&mut self) -> Result<()> { let dp = self.device_profile.as_ref().unwrap(); - let device = self.device.as_mut().unwrap(); // LoRaWAN 1.1 devices send a mac-command when changing to Class-C. if dp.supports_class_c && dp.mac_version.to_string().starts_with("1.0") { - *device = device::set_enabled_class(&device.dev_eui, DeviceClass::C).await?; + self.device_changeset.enabled_class = Some(DeviceClass::C); } else { - *device = device::set_enabled_class(&device.dev_eui, DeviceClass::A).await?; + self.device_changeset.enabled_class = Some(DeviceClass::A); } + Ok(()) } async fn set_dev_addr(&mut self) -> Result<()> { trace!("Setting DevAddr"); - let dev = self.device.as_mut().unwrap(); - *dev = device::set_dev_addr(dev.dev_eui, self.dev_addr.unwrap()).await?; + self.device_changeset.dev_addr = Some(Some(self.dev_addr.unwrap())); + self.device_changeset.secondary_dev_addr = Some(None); Ok(()) } async fn set_join_eui(&mut self) -> Result<()> { trace!("Setting JoinEUI"); - let dev = self.device.as_mut().unwrap(); let req = self.join_request.as_ref().unwrap(); + self.device_changeset.join_eui = Some(req.join_eui); + Ok(()) + } - *dev = device::set_join_eui(dev.dev_eui, req.join_eui).await?; - + async fn update_device(&mut self) -> Result<()> { + trace!("Updating device"); + let d = self.device.as_mut().unwrap(); + *d = device::partial_update(d.dev_eui, &self.device_changeset).await?; Ok(()) } diff --git a/shell.nix b/shell.nix index 2f68ed0d..a6e4983b 100644 --- a/shell.nix +++ b/shell.nix @@ -11,6 +11,7 @@ pkgs.mkShell { pkgs.perl pkgs.cmake pkgs.clang + pkgs.postgresql # needed to build the diesel cli utility ]; LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib"; BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.llvmPackages.libclang.lib}/lib/clang/${pkgs.llvmPackages.libclang.version}/include";