diff --git a/api/proto/api/device.proto b/api/proto/api/device.proto index 8108b62e..527f1a48 100644 --- a/api/proto/api/device.proto +++ b/api/proto/api/device.proto @@ -539,6 +539,10 @@ message DeviceQueueItem { // the data payload. In this case, the f_cnt_down field must be set to // the corresponding frame-counter which has been used during the encryption. bool is_encrypted = 9; + + // Expires at (optional). + // Expired queue-items will be automatically removed from the queue. + google.protobuf.Timestamp expires_at = 10; } message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; } @@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest { message GetDeviceNextFCntDownResponse { // FCntDown. uint32 f_cnt_down = 1; -} \ No newline at end of file +} diff --git a/api/proto/api/multicast_group.proto b/api/proto/api/multicast_group.proto index 92104e0f..8fec49bc 100644 --- a/api/proto/api/multicast_group.proto +++ b/api/proto/api/multicast_group.proto @@ -302,6 +302,10 @@ message MulticastGroupQueueItem { // Payload. bytes data = 4; + + // Expires at (optional). + // Expired queue-items will be automatically removed from the queue. + google.protobuf.Timestamp expires_at = 5; } message EnqueueMulticastGroupQueueItemRequest { diff --git a/api/proto/integration/integration.proto b/api/proto/integration/integration.proto index c96d32d9..b38227ac 100644 --- a/api/proto/integration/integration.proto +++ b/api/proto/integration/integration.proto @@ -60,6 +60,9 @@ enum LogCode { // Downlink frame-counter. F_CNT_DOWN = 10; + + // Downlink has expired. + EXPIRED = 11; } // Device information. diff --git a/api/rust/proto/chirpstack/api/device.proto b/api/rust/proto/chirpstack/api/device.proto index 8108b62e..527f1a48 100644 --- a/api/rust/proto/chirpstack/api/device.proto +++ b/api/rust/proto/chirpstack/api/device.proto @@ -539,6 +539,10 @@ message DeviceQueueItem { // the data payload. In this case, the f_cnt_down field must be set to // the corresponding frame-counter which has been used during the encryption. bool is_encrypted = 9; + + // Expires at (optional). + // Expired queue-items will be automatically removed from the queue. + google.protobuf.Timestamp expires_at = 10; } message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; } @@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest { message GetDeviceNextFCntDownResponse { // FCntDown. uint32 f_cnt_down = 1; -} \ No newline at end of file +} diff --git a/api/rust/proto/chirpstack/api/multicast_group.proto b/api/rust/proto/chirpstack/api/multicast_group.proto index 92104e0f..8fec49bc 100644 --- a/api/rust/proto/chirpstack/api/multicast_group.proto +++ b/api/rust/proto/chirpstack/api/multicast_group.proto @@ -302,6 +302,10 @@ message MulticastGroupQueueItem { // Payload. bytes data = 4; + + // Expires at (optional). + // Expired queue-items will be automatically removed from the queue. + google.protobuf.Timestamp expires_at = 5; } message EnqueueMulticastGroupQueueItemRequest { diff --git a/api/rust/proto/chirpstack/integration/integration.proto b/api/rust/proto/chirpstack/integration/integration.proto index c96d32d9..b38227ac 100644 --- a/api/rust/proto/chirpstack/integration/integration.proto +++ b/api/rust/proto/chirpstack/integration/integration.proto @@ -60,6 +60,9 @@ enum LogCode { // Downlink frame-counter. F_CNT_DOWN = 10; + + // Downlink has expired. + EXPIRED = 11; } // Device information. diff --git a/api/rust/src/integration.rs b/api/rust/src/integration.rs index 287f348c..09f847ef 100644 --- a/api/rust/src/integration.rs +++ b/api/rust/src/integration.rs @@ -32,6 +32,7 @@ impl Into for LogCode { LogCode::DownlinkGateway => "DOWNLINK_GATEWAY", LogCode::RelayNewEndDevice => "RELAY_NEW_END_DEVICE", LogCode::FCntDown => "F_CNT_DOWN", + LogCode::Expired => "EXPIRED", } .to_string() } diff --git a/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/down.sql b/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/down.sql new file mode 100644 index 00000000..b4d30d19 --- /dev/null +++ b/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/down.sql @@ -0,0 +1,6 @@ +alter table device_queue_item + drop column expires_at; + +alter table multicast_group_queue_item + drop column expires_at; + diff --git a/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/up.sql b/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/up.sql new file mode 100644 index 00000000..6b509707 --- /dev/null +++ b/chirpstack/migrations_postgres/2024-09-16-123034_add_queue_expires_at/up.sql @@ -0,0 +1,6 @@ +alter table multicast_group_queue_item + add column expires_at timestamp with time zone null; + +alter table device_queue_item + add column expires_at timestamp with time zone null; + diff --git a/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/down.sql b/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/down.sql new file mode 100644 index 00000000..b4d30d19 --- /dev/null +++ b/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/down.sql @@ -0,0 +1,6 @@ +alter table device_queue_item + drop column expires_at; + +alter table multicast_group_queue_item + drop column expires_at; + diff --git a/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/up.sql b/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/up.sql new file mode 100644 index 00000000..2c8219b1 --- /dev/null +++ b/chirpstack/migrations_sqlite/2024-09-17-104125_add_queue_expires_at/up.sql @@ -0,0 +1,5 @@ +alter table multicast_group_queue_item + add column expires_at datetime null; + +alter table device_queue_item + add column expires_at datetime null; diff --git a/chirpstack/src/api/device.rs b/chirpstack/src/api/device.rs index 7a77a617..41128551 100644 --- a/chirpstack/src/api/device.rs +++ b/chirpstack/src/api/device.rs @@ -1095,6 +1095,14 @@ impl DeviceService for Device { } else { None }, + expires_at: if let Some(expires_at) = req_qi.expires_at { + let expires_at: std::time::SystemTime = expires_at + .try_into() + .map_err(|e: prost_types::TimestampError| e.status())?; + Some(expires_at.into()) + } else { + None + }, data, ..Default::default() }; @@ -1169,6 +1177,10 @@ impl DeviceService for Device { is_pending: qi.is_pending, f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32, is_encrypted: qi.is_encrypted, + expires_at: qi.expires_at.map(|v| { + let v: std::time::SystemTime = v.into(); + v.into() + }), }) .collect(), }); diff --git a/chirpstack/src/api/multicast.rs b/chirpstack/src/api/multicast.rs index 0a1deb85..7f760ef8 100644 --- a/chirpstack/src/api/multicast.rs +++ b/chirpstack/src/api/multicast.rs @@ -411,6 +411,14 @@ impl MulticastGroupService for MulticastGroup { multicast_group_id: mg_id.into(), f_port: req_enq.f_port as i16, data: req_enq.data.clone(), + expires_at: if let Some(expires_at) = req_enq.expires_at { + let expires_at: std::time::SystemTime = expires_at + .try_into() + .map_err(|e: prost_types::TimestampError| e.status())?; + Some(expires_at.into()) + } else { + None + }, ..Default::default() }) .await @@ -478,6 +486,10 @@ impl MulticastGroupService for MulticastGroup { f_cnt: qi.f_cnt as u32, f_port: qi.f_port as u32, data: qi.data.clone(), + expires_at: qi.expires_at.map(|v| { + let v: std::time::SystemTime = v.into(); + v.into() + }), }); } } @@ -778,6 +790,7 @@ pub mod test { f_cnt: 31, f_port: 10, data: vec![1, 2, 3], + expires_at: None, }, list_queue_resp.items[0] ); diff --git a/chirpstack/src/downlink/data.rs b/chirpstack/src/downlink/data.rs index bd79af54..67d381c3 100644 --- a/chirpstack/src/downlink/data.rs +++ b/chirpstack/src/downlink/data.rs @@ -464,9 +464,11 @@ impl Data { // The queue item: // * should fit within the max payload size // * should not be pending + // * should not be expired // * in case encrypted, should have a valid FCntDown if qi.data.len() <= max_payload_size && !qi.is_pending + && !(qi.expires_at.is_some() && qi.expires_at.unwrap() < Utc::now()) && !(qi.is_encrypted && (qi.f_cnt_down.unwrap_or_default() as u32) < ds.get_a_f_cnt_down()) { @@ -526,6 +528,34 @@ impl Data { continue; } + // Handle expired payload. + if let Some(expires_at) = qi.expires_at { + if expires_at < Utc::now() { + device_queue::delete_item(&qi.id) + .await + .context("Delete device queue-item")?; + + let pl = integration_pb::LogEvent { + time: Some(Utc::now().into()), + device_info: Some(device_info.clone()), + level: integration_pb::LogLevel::Error.into(), + code: integration_pb::LogCode::Expired.into(), + description: "Device queue-item discarded because it has expired" + .to_string(), + context: [("queue_item_id".to_string(), qi.id.to_string())] + .iter() + .cloned() + .collect(), + }; + + integration::log_event(self.application.id.into(), &self.device.variables, &pl) + .await; + warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because it has expired"); + + continue; + } + } + // Handle payload size. if qi.data.len() > max_payload_size { device_queue::delete_item(&qi.id) @@ -2767,6 +2797,41 @@ mod test { ..Default::default() }), }, + Test { + name: "item has expired".into(), + max_payload_size: 10, + queue_items: vec![device_queue::DeviceQueueItem { + id: qi_id.into(), + dev_eui: d.dev_eui, + f_port: 1, + data: vec![1, 2, 3], + expires_at: Some(Utc::now() - chrono::Duration::seconds(10)), + ..Default::default() + }], + expected_queue_item: None, + expected_ack_event: None, + expected_log_event: Some(integration_pb::LogEvent { + device_info: Some(integration_pb::DeviceInfo { + tenant_id: t.id.to_string(), + tenant_name: t.name.clone(), + application_id: app.id.to_string(), + application_name: app.name.clone(), + device_profile_id: dp.id.to_string(), + device_profile_name: dp.name.clone(), + device_name: d.name.clone(), + dev_eui: d.dev_eui.to_string(), + ..Default::default() + }), + level: integration_pb::LogLevel::Error.into(), + code: integration_pb::LogCode::Expired.into(), + description: "Device queue-item discarded because it has expired".into(), + context: [("queue_item_id".to_string(), qi_id.to_string())] + .iter() + .cloned() + .collect(), + ..Default::default() + }), + }, Test { name: "is pending".into(), max_payload_size: 10, diff --git a/chirpstack/src/downlink/multicast.rs b/chirpstack/src/downlink/multicast.rs index 11a25cf1..b4aa880c 100644 --- a/chirpstack/src/downlink/multicast.rs +++ b/chirpstack/src/downlink/multicast.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use anyhow::{Context, Result}; +use chrono::Utc; use petgraph::algo::min_spanning_tree; use petgraph::data::FromElements; use petgraph::graph::{DefaultIx, Graph, NodeIndex, UnGraph}; @@ -55,6 +56,7 @@ impl Multicast { ctx.get_gateway().await?; ctx.set_region_config_id()?; ctx.get_multicast_group().await?; + ctx.validate_expiration().await?; ctx.validate_payload_size().await?; ctx.set_tx_info()?; ctx.set_phy_payload()?; @@ -90,6 +92,22 @@ impl Multicast { Ok(()) } + async fn validate_expiration(&self) -> Result<()> { + trace!("Validating expires_at"); + if let Some(expires_at) = self.multicast_group_queue_item.expires_at { + if Utc::now() > expires_at { + warn!( + expires_at = %expires_at, + "Discarding multicast-group queue item because it has expired" + ); + multicast::delete_queue_item(&self.multicast_group_queue_item.id).await?; + return Err(anyhow!("Queue item has expired and has been discarded")); + } + } + + Ok(()) + } + async fn validate_payload_size(&self) -> Result<()> { trace!("Validating payload size for DR"); let mg = self.multicast_group.as_ref().unwrap(); diff --git a/chirpstack/src/storage/device_queue.rs b/chirpstack/src/storage/device_queue.rs index aa0aa959..0d066313 100644 --- a/chirpstack/src/storage/device_queue.rs +++ b/chirpstack/src/storage/device_queue.rs @@ -22,6 +22,7 @@ pub struct DeviceQueueItem { pub f_cnt_down: Option, pub timeout_after: Option>, pub is_encrypted: bool, + pub expires_at: Option>, } impl DeviceQueueItem { @@ -57,6 +58,7 @@ impl Default for DeviceQueueItem { f_cnt_down: None, timeout_after: None, is_encrypted: false, + expires_at: None, } } } diff --git a/chirpstack/src/storage/multicast.rs b/chirpstack/src/storage/multicast.rs index f2000924..a75f4a15 100644 --- a/chirpstack/src/storage/multicast.rs +++ b/chirpstack/src/storage/multicast.rs @@ -98,6 +98,7 @@ pub struct MulticastGroupQueueItem { pub f_port: i16, pub data: Vec, pub emit_at_time_since_gps_epoch: Option, + pub expires_at: Option>, } impl MulticastGroupQueueItem { @@ -126,6 +127,7 @@ impl Default for MulticastGroupQueueItem { f_port: 0, data: vec![], emit_at_time_since_gps_epoch: None, + expires_at: None, } } } @@ -471,6 +473,7 @@ pub async fn enqueue( emit_at_time_since_gps_epoch: Some( emit_at_time_since_gps_epoch.num_milliseconds(), ), + expires_at: qi.expires_at.clone(), ..Default::default() }; @@ -543,6 +546,7 @@ pub async fn enqueue( f_port: qi.f_port, data: qi.data.clone(), emit_at_time_since_gps_epoch, + expires_at: qi.expires_at.clone(), ..Default::default() }; diff --git a/chirpstack/src/storage/schema_postgres.rs b/chirpstack/src/storage/schema_postgres.rs index 22c514dd..c4d27e9c 100644 --- a/chirpstack/src/storage/schema_postgres.rs +++ b/chirpstack/src/storage/schema_postgres.rs @@ -203,6 +203,7 @@ diesel::table! { f_cnt_down -> Nullable, timeout_after -> Nullable, is_encrypted -> Bool, + expires_at -> Nullable, } } @@ -277,6 +278,7 @@ diesel::table! { f_port -> Int2, data -> Bytea, emit_at_time_since_gps_epoch -> Nullable, + expires_at -> Nullable, } } diff --git a/chirpstack/src/storage/schema_sqlite.rs b/chirpstack/src/storage/schema_sqlite.rs index 047c5852..a7d380b7 100644 --- a/chirpstack/src/storage/schema_sqlite.rs +++ b/chirpstack/src/storage/schema_sqlite.rs @@ -183,6 +183,7 @@ diesel::table! { f_cnt_down -> Nullable, timeout_after -> Nullable, is_encrypted -> Bool, + expires_at -> Nullable, } } @@ -252,6 +253,7 @@ diesel::table! { f_port -> SmallInt, data -> Binary, emit_at_time_since_gps_epoch -> Nullable, + expires_at -> Nullable, } } diff --git a/chirpstack/src/test/multicast_test.rs b/chirpstack/src/test/multicast_test.rs index 5645abbf..6267a816 100644 --- a/chirpstack/src/test/multicast_test.rs +++ b/chirpstack/src/test/multicast_test.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{Duration, Utc}; use super::assert; use crate::storage::{ @@ -118,7 +118,7 @@ async fn test_multicast() { name: "one item in queue".into(), multicast_group: mg.clone(), multicast_group_queue_items: vec![multicast::MulticastGroupQueueItem { - multicast_group_id: mg.id.into(), + multicast_group_id: mg.id, f_port: 5, data: vec![1, 2, 3], ..Default::default() @@ -160,13 +160,13 @@ async fn test_multicast() { multicast_group: mg.clone(), multicast_group_queue_items: vec![ multicast::MulticastGroupQueueItem { - multicast_group_id: mg.id.into(), + multicast_group_id: mg.id, f_port: 5, data: vec![1, 2, 3], ..Default::default() }, multicast::MulticastGroupQueueItem { - multicast_group_id: mg.id.into(), + multicast_group_id: mg.id, f_port: 6, data: vec![1, 2, 3], ..Default::default() @@ -209,13 +209,13 @@ async fn test_multicast() { multicast_group: mg.clone(), multicast_group_queue_items: vec![ multicast::MulticastGroupQueueItem { - multicast_group_id: mg.id.into(), + multicast_group_id: mg.id, f_port: 5, data: vec![2; 300], ..Default::default() }, multicast::MulticastGroupQueueItem { - multicast_group_id: mg.id.into(), + multicast_group_id: mg.id, f_port: 6, data: vec![1, 2, 3], ..Default::default() @@ -223,6 +223,18 @@ async fn test_multicast() { ], assert: vec![assert::no_downlink_frame()], }, + MulticastTest { + name: "item discarded because it has expired".into(), + multicast_group: mg.clone(), + multicast_group_queue_items: vec![multicast::MulticastGroupQueueItem { + multicast_group_id: mg.id, + f_port: 5, + data: vec![1, 2, 3], + expires_at: Some(Utc::now() - Duration::seconds(10)), + ..Default::default() + }], + assert: vec![assert::no_downlink_frame()], + }, ]; for tst in &tests {