From 922a83597f8aaed089f5c6ed610d20c5aaa91102 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Fri, 29 Nov 2024 15:48:03 +0000 Subject: [PATCH] Refactor handling same uplink under multiple regions. --- chirpstack/src/downlink/tx_ack.rs | 2 +- chirpstack/src/gateway/backend/mqtt.rs | 19 +- chirpstack/src/storage/device.rs | 20 +- chirpstack/src/storage/downlink_frame.rs | 6 +- chirpstack/src/test/assert.rs | 2 +- chirpstack/src/test/class_a_pr_test.rs | 11 +- chirpstack/src/test/class_a_test.rs | 304 ++-------------------- chirpstack/src/test/class_b_test.rs | 11 +- chirpstack/src/test/otaa_js_test.rs | 11 +- chirpstack/src/test/otaa_pr_test.rs | 11 +- chirpstack/src/test/otaa_test.rs | 35 +-- chirpstack/src/test/relay_class_a_test.rs | 11 +- chirpstack/src/test/relay_otaa_test.rs | 11 +- chirpstack/src/uplink/data.rs | 28 +- chirpstack/src/uplink/join.rs | 32 +-- chirpstack/src/uplink/mod.rs | 94 +++---- 16 files changed, 141 insertions(+), 467 deletions(-) diff --git a/chirpstack/src/downlink/tx_ack.rs b/chirpstack/src/downlink/tx_ack.rs index 61449320..0ea6f838 100644 --- a/chirpstack/src/downlink/tx_ack.rs +++ b/chirpstack/src/downlink/tx_ack.rs @@ -182,7 +182,7 @@ impl TxAck { async fn get_downlink_frame(&mut self) -> Result<()> { trace!("Get downlink-frame from Redis"); - let df = downlink_frame::get(self.downlink_id).await?; + let df = downlink_frame::get_and_del(self.downlink_id).await?; let gw_df = &df .downlink_frame .as_ref() diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index a3b964ef..6e87bd8e 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -5,7 +5,6 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use chrono::Utc; use handlebars::Handlebars; use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; @@ -360,19 +359,11 @@ async fn message_callback( event.v4_migrate(); } - if let Some(rx_info) = &mut event.rx_info { - set_gateway_json(&rx_info.gateway_id, json); - rx_info.ns_time = Some(Utc::now().into()); - rx_info - .metadata - .insert("region_config_id".to_string(), region_config_id.to_string()); - rx_info.metadata.insert( - "region_common_name".to_string(), - region_common_name.to_string(), - ); - } - - tokio::spawn(uplink::deduplicate_uplink(event)); + tokio::spawn(uplink::deduplicate_uplink( + region_common_name, + region_config_id.to_string(), + event, + )); } else if topic.ends_with("/stats") { EVENT_COUNTER .get_or_create(&EventLabels { diff --git a/chirpstack/src/storage/device.rs b/chirpstack/src/storage/device.rs index 5071dc93..0a61352f 100644 --- a/chirpstack/src/storage/device.rs +++ b/chirpstack/src/storage/device.rs @@ -299,6 +299,7 @@ pub async fn get(dev_eui: &EUI64) -> Result { // On Ok response, the PhyPayload f_cnt will be set to the full 32bit frame-counter based on the // device-session context. pub async fn get_for_phypayload_and_incr_f_cnt_up( + region_config_id: &str, relayed: bool, phy: &mut lrwn::PhyPayload, tx_dr: u8, @@ -341,7 +342,17 @@ pub async fn get_for_phypayload_and_incr_f_cnt_up( } for ds in &mut sessions { - if ds.dev_addr != dev_addr.to_vec() { + // Set the region_config_id if it is empty, e.g. after a ChirpStack v3 to + // ChirpStack v4 migration. + if ds.region_config_id.is_empty() { + ds.region_config_id = region_config_id.into(); + } + // Check that the DevAddr and region_config_id are equal. + // The latter is needed because we must assure that the uplink was received + // under the same region as the device was activated. In case the uplink was + // received under two region configurations, this will start two uplink flows, + // each with their own region_config_id associated. + if ds.region_config_id != region_config_id || ds.dev_addr != dev_addr.to_vec() { continue; } @@ -1162,6 +1173,7 @@ pub mod test { dev_addr: Some(DevAddr::from_be_bytes([1, 2, 3, 4])), device_session: Some( internal::DeviceSession { + region_config_id: "eu868".into(), dev_addr: vec![0x01, 0x02, 0x03, 0x04], s_nwk_s_int_key: vec![ 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, @@ -1191,6 +1203,7 @@ pub mod test { dev_addr: Some(DevAddr::from_be_bytes([1, 2, 3, 4])), device_session: Some( internal::DeviceSession { + region_config_id: "eu868".into(), dev_addr: vec![0x01, 0x02, 0x03, 0x04], s_nwk_s_int_key: vec![ 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, @@ -1220,6 +1233,7 @@ pub mod test { secondary_dev_addr: Some(DevAddr::from_be_bytes([4, 3, 2, 1])), device_session: Some( internal::DeviceSession { + region_config_id: "eu868".into(), dev_addr: vec![0x01, 0x02, 0x03, 0x04], s_nwk_s_int_key: vec![ 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, @@ -1235,6 +1249,7 @@ pub mod test { ], f_cnt_up: 300, pending_rejoin_device_session: Some(Box::new(internal::DeviceSession { + region_config_id: "eu868".into(), dev_addr: vec![0x04, 0x03, 0x02, 0x01], s_nwk_s_int_key: vec![ 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, @@ -1265,6 +1280,7 @@ pub mod test { dev_addr: Some(DevAddr::from_be_bytes([1, 2, 3, 4])), device_session: Some( internal::DeviceSession { + region_config_id: "eu868".into(), dev_addr: vec![0x01, 0x02, 0x03, 0x04], s_nwk_s_int_key: vec![ 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, @@ -1476,7 +1492,7 @@ pub mod test { pl.fhdr.f_cnt = tst.f_cnt % (1 << 16); } - let d = get_for_phypayload_and_incr_f_cnt_up(false, &mut phy, 0, 0).await; + let d = get_for_phypayload_and_incr_f_cnt_up("eu868", false, &mut phy, 0, 0).await; if tst.expected_error.is_some() { assert!(d.is_err()); assert_eq!( diff --git a/chirpstack/src/storage/downlink_frame.rs b/chirpstack/src/storage/downlink_frame.rs index b7c39bb7..4cfa5f5d 100644 --- a/chirpstack/src/storage/downlink_frame.rs +++ b/chirpstack/src/storage/downlink_frame.rs @@ -22,9 +22,9 @@ pub async fn save(df: &internal::DownlinkFrame) -> Result<()> { Ok(()) } -pub async fn get(id: u32) -> Result { +pub async fn get_and_del(id: u32) -> Result { let key = redis_key(format!("frame:{}", id)); - let v: Vec = redis::cmd("GET") + let v: Vec = redis::cmd("GETDEL") .arg(key) .query_async(&mut get_async_redis_conn().await?) .await?; @@ -53,7 +53,7 @@ pub mod test { }; save(&df).await.unwrap(); - let df_get = get(12345).await.unwrap(); + let df_get = get_and_del(12345).await.unwrap(); assert_eq!(df, df_get); } } diff --git a/chirpstack/src/test/assert.rs b/chirpstack/src/test/assert.rs index 50c80c11..95b0923f 100644 --- a/chirpstack/src/test/assert.rs +++ b/chirpstack/src/test/assert.rs @@ -344,7 +344,7 @@ pub fn downlink_frame_saved(df: internal::DownlinkFrame) -> Validator { Box::new(move || { let df = df.clone(); Box::pin(async move { - let mut df_get = downlink_frame::get(*LAST_DOWNLINK_ID.read().await) + let mut df_get = downlink_frame::get_and_del(*LAST_DOWNLINK_ID.read().await) .await .unwrap(); diff --git a/chirpstack/src/test/class_a_pr_test.rs b/chirpstack/src/test/class_a_pr_test.rs index c60f8a2e..7ab5a2bc 100644 --- a/chirpstack/src/test/class_a_pr_test.rs +++ b/chirpstack/src/test/class_a_pr_test.rs @@ -16,6 +16,7 @@ use crate::storage::{ }; use crate::{config, test, uplink}; use chirpstack_api::{common, gw, internal}; +use lrwn::region::CommonName; use lrwn::{AES128Key, NetID, EUI64}; #[tokio::test] @@ -59,7 +60,7 @@ async fn test_fns_uplink() { let recv_time = Utc::now(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), gw_time: Some(recv_time.into()), location: Some(common::Location { @@ -70,12 +71,6 @@ async fn test_fns_uplink() { }), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -154,6 +149,8 @@ async fn test_fns_uplink() { // Simulate uplink uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: data_phy.to_vec().unwrap(), diff --git a/chirpstack/src/test/class_a_test.rs b/chirpstack/src/test/class_a_test.rs index b338186c..e15dfff9 100644 --- a/chirpstack/src/test/class_a_test.rs +++ b/chirpstack/src/test/class_a_test.rs @@ -12,6 +12,7 @@ use crate::storage::{ }; use crate::{config, gateway::backend as gateway_backend, integration, region, test, uplink}; use chirpstack_api::{common, gw, integration as integration_pb, internal, stream}; +use lrwn::region::CommonName; use lrwn::{AES128Key, DevAddr, EUI64}; type Function = Box Pin>>>; @@ -119,29 +120,17 @@ async fn test_gateway_filtering() { let ds = dev.get_device_session().unwrap(); - let mut rx_info_a = gw::UplinkRxInfo { + let rx_info_a = gw::UplinkRxInfo { gateway_id: gw_a.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info_a - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info_a - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); - let mut rx_info_b = gw::UplinkRxInfo { + let rx_info_b = gw::UplinkRxInfo { gateway_id: gw_b.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info_b - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info_b - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -211,171 +200,6 @@ async fn test_gateway_filtering() { } } -#[tokio::test] -async fn test_region_config_id_filtering() { - let _guard = test::prepare().await; - - // We need to configure the eu868_other region. - let region_conf = lrwn::region::get(lrwn::region::CommonName::EU868, false, false); - region::set("eu868_other", region_conf); - - let t = tenant::create(tenant::Tenant { - name: "tenant".into(), - can_have_gateways: true, - ..Default::default() - }) - .await - .unwrap(); - - let gw = gateway::create(gateway::Gateway { - name: "test-gw".into(), - tenant_id: t.id, - gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), - ..Default::default() - }) - .await - .unwrap(); - - let app = application::create(application::Application { - name: "app".into(), - tenant_id: t.id, - ..Default::default() - }) - .await - .unwrap(); - - let dp = device_profile::create(device_profile::DeviceProfile { - name: "test-dp".into(), - tenant_id: t.id, - region: lrwn::region::CommonName::EU868, - region_config_id: Some("eu868".to_string()), - mac_version: lrwn::region::MacVersion::LORAWAN_1_0_2, - reg_params_revision: lrwn::region::Revision::A, - supports_otaa: true, - ..Default::default() - }) - .await - .unwrap(); - - let dev = device::create(device::Device { - name: "device".into(), - application_id: app.id, - device_profile_id: dp.id, - dev_eui: EUI64::from_be_bytes([2, 2, 3, 4, 5, 6, 7, 8]), - enabled_class: DeviceClass::A, - dev_addr: Some(DevAddr::from_be_bytes([1, 2, 3, 4])), - ..Default::default() - }) - .await - .unwrap(); - - let mut rx_info_ok = gw::UplinkRxInfo { - gateway_id: gw.gateway_id.to_string(), - location: Some(Default::default()), - ..Default::default() - }; - rx_info_ok - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info_ok - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); - - let mut rx_info_invalid = gw::UplinkRxInfo { - gateway_id: gw.gateway_id.to_string(), - location: Some(Default::default()), - ..Default::default() - }; - rx_info_invalid - .metadata - .insert("region_config_id".to_string(), "eu868_other".to_string()); - rx_info_invalid - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); - - let mut tx_info = gw::UplinkTxInfo { - frequency: 868100000, - ..Default::default() - }; - uplink::helpers::set_uplink_modulation("eu868", &mut tx_info, 0).unwrap(); - - let ds = internal::DeviceSession { - mac_version: common::MacVersion::Lorawan102.into(), - dev_addr: vec![1, 2, 3, 4], - f_nwk_s_int_key: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], - s_nwk_s_int_key: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], - nwk_s_enc_key: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], - f_cnt_up: 7, - n_f_cnt_down: 5, - enabled_uplink_channel_indices: vec![0, 1, 2], - rx1_delay: 1, - rx2_frequency: 869525000, - region_config_id: "eu868".into(), - ..Default::default() - }; - - let tests = vec![ - Test { - name: "matching config id".into(), - dev_eui: dev.dev_eui, - device_queue_items: vec![], - before_func: None, - after_func: None, - device_session: Some(ds.clone()), - tx_info: tx_info.clone(), - rx_info: rx_info_ok.clone(), - phy_payload: lrwn::PhyPayload { - mhdr: lrwn::MHDR { - m_type: lrwn::MType::UnconfirmedDataUp, - major: lrwn::Major::LoRaWANR1, - }, - payload: lrwn::Payload::MACPayload(lrwn::MACPayload { - fhdr: lrwn::FHDR { - devaddr: lrwn::DevAddr::from_be_bytes([1, 2, 3, 4]), - f_cnt: 7, - ..Default::default() - }, - f_port: Some(1), - frm_payload: None, - }), - mic: Some([48, 94, 26, 239]), - }, - assert: vec![assert::f_cnt_up(dev.dev_eui, 8)], - }, - Test { - name: "non-matching configuration id".into(), - dev_eui: dev.dev_eui, - device_queue_items: vec![], - before_func: None, - after_func: None, - device_session: Some(ds.clone()), - tx_info: tx_info.clone(), - rx_info: rx_info_invalid.clone(), - phy_payload: lrwn::PhyPayload { - mhdr: lrwn::MHDR { - m_type: lrwn::MType::UnconfirmedDataUp, - major: lrwn::Major::LoRaWANR1, - }, - payload: lrwn::Payload::MACPayload(lrwn::MACPayload { - fhdr: lrwn::FHDR { - devaddr: lrwn::DevAddr::from_be_bytes([1, 2, 3, 4]), - f_cnt: 7, - ..Default::default() - }, - f_port: Some(1), - frm_payload: None, - }), - mic: Some([48, 94, 26, 239]), - }, - assert: vec![assert::f_cnt_up(dev.dev_eui, 7)], - }, - ]; - - for tst in &tests { - run_test(tst).await; - } -} - #[tokio::test] async fn test_lorawan_10_errors() { let _guard = test::prepare().await; @@ -429,17 +253,11 @@ async fn test_lorawan_10_errors() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -626,17 +444,11 @@ async fn test_lorawan_11_errors() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info_freq = gw::UplinkTxInfo { frequency: 868300000, @@ -781,17 +593,11 @@ async fn test_lorawan_10_skip_f_cnt() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -975,17 +781,11 @@ async fn test_lorawan_10_device_disabled() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -1098,17 +898,11 @@ async fn test_lorawan_10_uplink() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -1735,17 +1529,11 @@ async fn test_lorawan_10_end_to_end_enc() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -2063,17 +1851,11 @@ async fn test_lorawan_11_uplink() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -2302,17 +2084,11 @@ async fn test_lorawan_10_rx_delay() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -2747,17 +2523,11 @@ async fn test_lorawan_10_mac_commands() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -3118,17 +2888,11 @@ async fn test_lorawan_11_mac_commands() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -3312,17 +3076,11 @@ async fn test_lorawan_10_device_queue() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -3789,17 +3547,11 @@ async fn test_lorawan_11_device_queue() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -4270,17 +4022,11 @@ async fn test_lorawan_10_adr() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -5113,17 +4859,11 @@ async fn test_lorawan_10_device_status_request() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -5378,17 +5118,11 @@ async fn test_lorawan_11_receive_window_selection() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -5807,6 +5541,8 @@ async fn run_test(t: &Test) { } uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: t.phy_payload.to_vec().unwrap(), diff --git a/chirpstack/src/test/class_b_test.rs b/chirpstack/src/test/class_b_test.rs index b247ff4d..a63f3472 100644 --- a/chirpstack/src/test/class_b_test.rs +++ b/chirpstack/src/test/class_b_test.rs @@ -12,6 +12,7 @@ use crate::{ uplink, }; use chirpstack_api::{common, gw, internal}; +use lrwn::region::CommonName; use lrwn::{DevAddr, EUI64}; struct UplinkTest { @@ -88,16 +89,10 @@ async fn test_uplink() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -466,6 +461,8 @@ async fn run_uplink_test(t: &UplinkTest) { } uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: t.phy_payload.to_vec().unwrap(), diff --git a/chirpstack/src/test/otaa_js_test.rs b/chirpstack/src/test/otaa_js_test.rs index 7a7921f7..ec0f794e 100644 --- a/chirpstack/src/test/otaa_js_test.rs +++ b/chirpstack/src/test/otaa_js_test.rs @@ -8,6 +8,7 @@ use crate::{ uplink, }; use chirpstack_api::{common, gw, integration as integration_pb, internal}; +use lrwn::region::CommonName; use lrwn::{DevAddr, EUI64Prefix, EUI64}; struct Test { @@ -76,17 +77,11 @@ async fn test_js() { }; uplink::helpers::set_uplink_modulation("eu868", &mut tx_info, 0).unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let phy = lrwn::PhyPayload { mhdr: lrwn::MHDR { @@ -371,6 +366,8 @@ async fn run_test(t: &Test) { gateway_backend::mock::reset().await; uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: t.phy_payload.to_vec().unwrap(), diff --git a/chirpstack/src/test/otaa_pr_test.rs b/chirpstack/src/test/otaa_pr_test.rs index ecad919c..a86de504 100644 --- a/chirpstack/src/test/otaa_pr_test.rs +++ b/chirpstack/src/test/otaa_pr_test.rs @@ -17,6 +17,7 @@ use crate::storage::{ }; use crate::{config, storage::fields, test, uplink}; use chirpstack_api::gw; +use lrwn::region::CommonName; use lrwn::{AES128Key, EUI64Prefix, NetID, EUI64}; #[tokio::test] @@ -68,18 +69,12 @@ async fn test_fns() { let recv_time = Utc::now(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), gw_time: Some(recv_time.into()), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -194,6 +189,8 @@ async fn test_fns() { // Simulate uplink uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: jr_phy.to_vec().unwrap(), diff --git a/chirpstack/src/test/otaa_test.rs b/chirpstack/src/test/otaa_test.rs index 84622d87..5e2a3c22 100644 --- a/chirpstack/src/test/otaa_test.rs +++ b/chirpstack/src/test/otaa_test.rs @@ -15,6 +15,7 @@ use crate::{ }; use chirpstack_api::{common, gw, internal, stream}; use lrwn::keys::get_js_int_key; +use lrwn::region::CommonName; use lrwn::{AES128Key, EUI64}; type Function = Box Pin>>>; @@ -113,29 +114,17 @@ async fn test_gateway_filtering() { .await .unwrap(); - let mut rx_info_a = gw::UplinkRxInfo { + let rx_info_a = gw::UplinkRxInfo { gateway_id: gw_a.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info_a - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info_a - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); - let mut rx_info_b = gw::UplinkRxInfo { + let rx_info_b = gw::UplinkRxInfo { gateway_id: gw_b.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info_b - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info_b - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -289,17 +278,11 @@ async fn test_lorawan_10() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -949,17 +932,11 @@ async fn test_lorawan_11() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -1277,6 +1254,8 @@ async fn run_test(t: &Test) { } uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: t.phy_payload.to_vec().unwrap(), diff --git a/chirpstack/src/test/relay_class_a_test.rs b/chirpstack/src/test/relay_class_a_test.rs index 73517a5d..14d3c721 100644 --- a/chirpstack/src/test/relay_class_a_test.rs +++ b/chirpstack/src/test/relay_class_a_test.rs @@ -10,6 +10,7 @@ use crate::storage::{ }; use crate::{gateway::backend as gateway_backend, integration, test, uplink}; use chirpstack_api::{common, gw, integration as integration_pb, internal}; +use lrwn::region::CommonName; use lrwn::{AES128Key, DevAddr, EUI64}; struct Test { @@ -105,17 +106,11 @@ async fn test_lorawan_10() { .await .unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -813,6 +808,8 @@ async fn run_test(t: &Test) { } uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: t.phy_payload.to_vec().unwrap(), diff --git a/chirpstack/src/test/relay_otaa_test.rs b/chirpstack/src/test/relay_otaa_test.rs index faf21196..90ec5bda 100644 --- a/chirpstack/src/test/relay_otaa_test.rs +++ b/chirpstack/src/test/relay_otaa_test.rs @@ -10,6 +10,7 @@ use crate::storage::{ }; use crate::{gateway::backend as gateway_backend, integration, test, uplink}; use chirpstack_api::{common, gw, internal}; +use lrwn::region::CommonName; use lrwn::{AES128Key, DevAddr, EUI64}; #[tokio::test] @@ -120,17 +121,11 @@ async fn test_lorawan_10() { let ds_relay = dev_relay.get_device_session().unwrap(); - let mut rx_info = gw::UplinkRxInfo { + let rx_info = gw::UplinkRxInfo { gateway_id: gw.gateway_id.to_string(), location: Some(Default::default()), ..Default::default() }; - rx_info - .metadata - .insert("region_config_id".to_string(), "eu868".to_string()); - rx_info - .metadata - .insert("region_common_name".to_string(), "EU868".to_string()); let mut tx_info = gw::UplinkTxInfo { frequency: 868100000, @@ -221,6 +216,8 @@ async fn test_lorawan_10() { .unwrap(); uplink::handle_uplink( + CommonName::EU868, + "eu868".into(), Uuid::new_v4(), gw::UplinkFrameSet { phy_payload: phy_relay_jr.to_vec().unwrap(), diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index e7515621..56b1384b 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -6,10 +6,7 @@ use chrono::{DateTime, Duration, Local, Utc}; use tracing::{debug, error, info, span, trace, warn, Instrument, Level}; use super::error::Error; -use super::{ - data_fns, filter_rx_info_by_region_config_id, filter_rx_info_by_tenant_id, helpers, - RelayContext, UplinkFrameSet, -}; +use super::{data_fns, filter_rx_info_by_tenant_id, helpers, RelayContext, UplinkFrameSet}; use crate::api::helpers::ToProto; use crate::backend::roaming; use crate::helpers::errors::PrintFullError; @@ -124,7 +121,6 @@ impl Data { // In case of roaming we do not know the gateways and therefore it must not be // filtered. ctx.filter_rx_info_by_tenant().await?; - ctx.filter_rx_info_by_region_config_id()?; } ctx.set_device_info()?; ctx.set_device_gateway_rx_info()?; @@ -238,6 +234,7 @@ impl Data { }; match device::get_for_phypayload_and_incr_f_cnt_up( + &self.uplink_frame_set.region_config_id, false, &mut self.phy_payload, self.uplink_frame_set.dr, @@ -305,8 +302,14 @@ impl Data { dr, )? as u8; - match device::get_for_phypayload_and_incr_f_cnt_up(true, &mut self.phy_payload, dr, ch) - .await + match device::get_for_phypayload_and_incr_f_cnt_up( + &self.uplink_frame_set.region_config_id, + true, + &mut self.phy_payload, + dr, + ch, + ) + .await { Ok(v) => match v { device::ValidationStatus::Ok(f_cnt, d) => { @@ -572,17 +575,6 @@ impl Data { } } - fn filter_rx_info_by_region_config_id(&mut self) -> Result<()> { - trace!("Filtering rx_info by region_config_id"); - - let dp = self.device_profile.as_ref().unwrap(); - if let Some(v) = &dp.region_config_id { - filter_rx_info_by_region_config_id(v, &mut self.uplink_frame_set)?; - } - - Ok(()) - } - fn decrypt_f_opts_mac_commands(&mut self) -> Result<()> { trace!("Decrypting mac-commands"); let ds = self.device.as_ref().unwrap().get_device_session()?; diff --git a/chirpstack/src/uplink/join.rs b/chirpstack/src/uplink/join.rs index 0323e3ae..52e859b2 100644 --- a/chirpstack/src/uplink/join.rs +++ b/chirpstack/src/uplink/join.rs @@ -12,10 +12,7 @@ use lrwn::{ use super::error::Error; use super::join_fns; -use super::{ - filter_rx_info_by_region_config_id, filter_rx_info_by_tenant_id, helpers, RelayContext, - UplinkFrameSet, -}; +use super::{filter_rx_info_by_tenant_id, helpers, RelayContext, UplinkFrameSet}; use crate::api::{backend::get_async_receiver, helpers::ToProto}; use crate::backend::{joinserver, keywrap, roaming}; @@ -119,8 +116,8 @@ impl JoinRequest { ctx.get_device_data_or_try_pr_roaming().await?; ctx.get_device_keys_or_js_client().await?; // used to validate MIC + if we need external JS ctx.set_device_info()?; + ctx.validate_region_config_id()?; ctx.filter_rx_info_by_tenant()?; - ctx.filter_rx_info_by_region_config_id()?; ctx.abort_on_device_is_disabled()?; ctx.abort_on_relay_only_comm()?; ctx.log_uplink_frame_set().await?; @@ -337,6 +334,20 @@ impl JoinRequest { Ok(()) } + fn validate_region_config_id(&self) -> Result<(), Error> { + trace!("Validating region_config_id against device-profile"); + + let dp = self.device_profile.as_ref().unwrap(); + if let Some(v) = &dp.region_config_id { + if !self.uplink_frame_set.region_config_id.eq(v) { + warn!("Aborting as region config ID does not match with device-profile"); + return Err(Error::Abort); + } + } + + Ok(()) + } + fn filter_rx_info_by_tenant(&mut self) -> Result<()> { trace!("Filtering rx_info by tenant_id"); @@ -347,17 +358,6 @@ impl JoinRequest { Ok(()) } - fn filter_rx_info_by_region_config_id(&mut self) -> Result<()> { - trace!("Filtering rx_info by region_config_id"); - - let dp = self.device_profile.as_ref().unwrap(); - if let Some(v) = &dp.region_config_id { - filter_rx_info_by_region_config_id(v, &mut self.uplink_frame_set)?; - } - - Ok(()) - } - async fn log_uplink_frame_set(&self) -> Result<()> { trace!("Logging uplink frame-set"); let ufl: stream_pb::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; diff --git a/chirpstack/src/uplink/mod.rs b/chirpstack/src/uplink/mod.rs index a8f603bd..a1b3cf7e 100644 --- a/chirpstack/src/uplink/mod.rs +++ b/chirpstack/src/uplink/mod.rs @@ -156,21 +156,35 @@ pub struct RoamingMetaData { pub ul_meta_data: backend::ULMetaData, } -pub async fn deduplicate_uplink(event: gw::UplinkFrame) { - if let Err(e) = _deduplicate_uplink(event).await { +pub async fn deduplicate_uplink( + region_common_name: CommonName, + region_config_id: String, + event: gw::UplinkFrame, +) { + if let Err(e) = _deduplicate_uplink(region_common_name, ®ion_config_id, event).await { error!(error = %e.full(), "Deduplication error"); } } -async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { +async fn _deduplicate_uplink( + region_common_name: CommonName, + region_config_id: &str, + event: gw::UplinkFrame, +) -> Result<()> { let phy_str = hex::encode(&event.phy_payload); let tx_info_str = match &event.tx_info { Some(tx_info) => hex::encode(tx_info.encode_to_vec()), None => "".to_string(), }; - let key = redis_key(format!("up:collect:{{{}:{}}}", tx_info_str, phy_str)); - let lock_key = redis_key(format!("up:collect:{{{}:{}}}:lock", tx_info_str, phy_str)); + let key = redis_key(format!( + "up:collect:{{{}:{}:{}}}", + region_config_id, tx_info_str, phy_str + )); + let lock_key = redis_key(format!( + "up:collect:{{{}:{}:{}}}:lock", + region_config_id, tx_info_str, phy_str + )); let dedup_delay = config::get().network.deduplication_delay; let mut dedup_ttl = dedup_delay * 2; @@ -207,9 +221,14 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> { let deduplication_id = Uuid::new_v4(); let span = span!(Level::INFO, "up", deduplication_id = %deduplication_id); - handle_uplink(deduplication_id, uplink) - .instrument(span) - .await?; + handle_uplink( + region_common_name, + region_config_id, + deduplication_id, + uplink, + ) + .instrument(span) + .await?; Ok(()) } @@ -283,30 +302,16 @@ async fn deduplicate_collect(key: &str) -> Result { Ok(pl) } -pub async fn handle_uplink(deduplication_id: Uuid, uplink: gw::UplinkFrameSet) -> Result<()> { - let rx_info = &uplink - .rx_info - .first() - .context("Unable to get first item from rx_info")?; - - let region_config_id = rx_info - .metadata - .get("region_config_id") - .cloned() - .unwrap_or_default(); - - let common_name = rx_info - .metadata - .get("region_common_name") - .cloned() - .unwrap_or_default(); - - let common_name = CommonName::from_str(&common_name)?; - +pub async fn handle_uplink( + region_common_name: CommonName, + region_config_id: &str, + deduplication_id: Uuid, + uplink: gw::UplinkFrameSet, +) -> Result<()> { let mut uplink = UplinkFrameSet { uplink_set_id: deduplication_id, - region_config_id, - region_common_name: common_name, + region_common_name, + region_config_id: region_config_id.to_string(), dr: 0, ch: 0, phy_payload: PhyPayload::from_slice(&uplink.phy_payload)?, @@ -409,16 +414,11 @@ async fn update_gateway_metadata(ufs: &mut UplinkFrameSet) -> Result<()> { } fn filter_rx_info_by_tenant_id(tenant_id: Uuid, uplink: &mut UplinkFrameSet) -> Result<()> { + let force_gws_private = config::get_force_gws_private(&uplink.region_config_id)?; let mut rx_info_set: Vec = Vec::new(); for rx_info in &uplink.rx_info_set { let gateway_id = EUI64::from_str(&rx_info.gateway_id).context("Gateway ID")?; - let region_config_id = rx_info - .metadata - .get("region_config_id") - .map(|v| v.to_string()) - .ok_or_else(|| anyhow!("No region_config_id in rx_info metadata"))?; - let force_gws_private = config::get_force_gws_private(®ion_config_id)?; if !(uplink .gateway_private_up_map @@ -466,25 +466,3 @@ fn filter_rx_info_by_public_only(uplink: &mut UplinkFrameSet) -> Result<()> { Ok(()) } - -fn filter_rx_info_by_region_config_id( - region_config_id: &str, - uplink: &mut UplinkFrameSet, -) -> Result<()> { - let mut rx_info_set: Vec = Vec::new(); - - for rx_info in &uplink.rx_info_set { - if let Some(v) = rx_info.metadata.get("region_config_id") { - if v == region_config_id { - rx_info_set.push(rx_info.clone()); - } - } - } - - uplink.rx_info_set = rx_info_set; - if uplink.rx_info_set.is_empty() { - return Err(anyhow!("rx_info_set is empty")); - } - - Ok(()) -}