Add expires_at to queue-items (unicast & multicast).

This makes it possible to automatically remove items from the queue in
case the expires_at timestamp has reached. This field is optional and
the default remains to never expire queue-items.
This commit is contained in:
Orne Brocaar 2024-09-17 11:54:29 +01:00
parent 2919fb79e5
commit 3829f591e4
20 changed files with 184 additions and 8 deletions

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to // the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption. // the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9; bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
} }
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; } message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse { message GetDeviceNextFCntDownResponse {
// FCntDown. // FCntDown.
uint32 f_cnt_down = 1; uint32 f_cnt_down = 1;
} }

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload. // Payload.
bytes data = 4; bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
} }
message EnqueueMulticastGroupQueueItemRequest { message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter. // Downlink frame-counter.
F_CNT_DOWN = 10; F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
} }
// Device information. // Device information.

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to // the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption. // the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9; bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
} }
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; } message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse { message GetDeviceNextFCntDownResponse {
// FCntDown. // FCntDown.
uint32 f_cnt_down = 1; uint32 f_cnt_down = 1;
} }

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload. // Payload.
bytes data = 4; bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
} }
message EnqueueMulticastGroupQueueItemRequest { message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter. // Downlink frame-counter.
F_CNT_DOWN = 10; F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
} }
// Device information. // Device information.

View File

@ -32,6 +32,7 @@ impl Into<String> for LogCode {
LogCode::DownlinkGateway => "DOWNLINK_GATEWAY", LogCode::DownlinkGateway => "DOWNLINK_GATEWAY",
LogCode::RelayNewEndDevice => "RELAY_NEW_END_DEVICE", LogCode::RelayNewEndDevice => "RELAY_NEW_END_DEVICE",
LogCode::FCntDown => "F_CNT_DOWN", LogCode::FCntDown => "F_CNT_DOWN",
LogCode::Expired => "EXPIRED",
} }
.to_string() .to_string()
} }

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,6 @@
alter table multicast_group_queue_item
add column expires_at timestamp with time zone null;
alter table device_queue_item
add column expires_at timestamp with time zone null;

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,5 @@
alter table multicast_group_queue_item
add column expires_at datetime null;
alter table device_queue_item
add column expires_at datetime null;

View File

@ -1095,6 +1095,14 @@ impl DeviceService for Device {
} else { } else {
None None
}, },
expires_at: if let Some(expires_at) = req_qi.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
data, data,
..Default::default() ..Default::default()
}; };
@ -1169,6 +1177,10 @@ impl DeviceService for Device {
is_pending: qi.is_pending, is_pending: qi.is_pending,
f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32, f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32,
is_encrypted: qi.is_encrypted, is_encrypted: qi.is_encrypted,
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
}) })
.collect(), .collect(),
}); });

View File

@ -411,6 +411,14 @@ impl MulticastGroupService for MulticastGroup {
multicast_group_id: mg_id.into(), multicast_group_id: mg_id.into(),
f_port: req_enq.f_port as i16, f_port: req_enq.f_port as i16,
data: req_enq.data.clone(), data: req_enq.data.clone(),
expires_at: if let Some(expires_at) = req_enq.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
..Default::default() ..Default::default()
}) })
.await .await
@ -478,6 +486,10 @@ impl MulticastGroupService for MulticastGroup {
f_cnt: qi.f_cnt as u32, f_cnt: qi.f_cnt as u32,
f_port: qi.f_port as u32, f_port: qi.f_port as u32,
data: qi.data.clone(), data: qi.data.clone(),
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
}); });
} }
} }
@ -778,6 +790,7 @@ pub mod test {
f_cnt: 31, f_cnt: 31,
f_port: 10, f_port: 10,
data: vec![1, 2, 3], data: vec![1, 2, 3],
expires_at: None,
}, },
list_queue_resp.items[0] list_queue_resp.items[0]
); );

View File

@ -464,9 +464,11 @@ impl Data {
// The queue item: // The queue item:
// * should fit within the max payload size // * should fit within the max payload size
// * should not be pending // * should not be pending
// * should not be expired
// * in case encrypted, should have a valid FCntDown // * in case encrypted, should have a valid FCntDown
if qi.data.len() <= max_payload_size if qi.data.len() <= max_payload_size
&& !qi.is_pending && !qi.is_pending
&& !(qi.expires_at.is_some() && qi.expires_at.unwrap() < Utc::now())
&& !(qi.is_encrypted && !(qi.is_encrypted
&& (qi.f_cnt_down.unwrap_or_default() as u32) < ds.get_a_f_cnt_down()) && (qi.f_cnt_down.unwrap_or_default() as u32) < ds.get_a_f_cnt_down())
{ {
@ -526,6 +528,34 @@ impl Data {
continue; continue;
} }
// Handle expired payload.
if let Some(expires_at) = qi.expires_at {
if expires_at < Utc::now() {
device_queue::delete_item(&qi.id)
.await
.context("Delete device queue-item")?;
let pl = integration_pb::LogEvent {
time: Some(Utc::now().into()),
device_info: Some(device_info.clone()),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired"
.to_string(),
context: [("queue_item_id".to_string(), qi.id.to_string())]
.iter()
.cloned()
.collect(),
};
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because it has expired");
continue;
}
}
// Handle payload size. // Handle payload size.
if qi.data.len() > max_payload_size { if qi.data.len() > max_payload_size {
device_queue::delete_item(&qi.id) device_queue::delete_item(&qi.id)
@ -2767,6 +2797,41 @@ mod test {
..Default::default() ..Default::default()
}), }),
}, },
Test {
name: "item has expired".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
expires_at: Some(Utc::now() - chrono::Duration::seconds(10)),
..Default::default()
}],
expected_queue_item: None,
expected_ack_event: None,
expected_log_event: Some(integration_pb::LogEvent {
device_info: Some(integration_pb::DeviceInfo {
tenant_id: t.id.to_string(),
tenant_name: t.name.clone(),
application_id: app.id.to_string(),
application_name: app.name.clone(),
device_profile_id: dp.id.to_string(),
device_profile_name: dp.name.clone(),
device_name: d.name.clone(),
dev_eui: d.dev_eui.to_string(),
..Default::default()
}),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired".into(),
context: [("queue_item_id".to_string(), qi_id.to_string())]
.iter()
.cloned()
.collect(),
..Default::default()
}),
},
Test { Test {
name: "is pending".into(), name: "is pending".into(),
max_payload_size: 10, max_payload_size: 10,

View File

@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::Utc;
use petgraph::algo::min_spanning_tree; use petgraph::algo::min_spanning_tree;
use petgraph::data::FromElements; use petgraph::data::FromElements;
use petgraph::graph::{DefaultIx, Graph, NodeIndex, UnGraph}; use petgraph::graph::{DefaultIx, Graph, NodeIndex, UnGraph};
@ -55,6 +56,7 @@ impl Multicast {
ctx.get_gateway().await?; ctx.get_gateway().await?;
ctx.set_region_config_id()?; ctx.set_region_config_id()?;
ctx.get_multicast_group().await?; ctx.get_multicast_group().await?;
ctx.validate_expiration().await?;
ctx.validate_payload_size().await?; ctx.validate_payload_size().await?;
ctx.set_tx_info()?; ctx.set_tx_info()?;
ctx.set_phy_payload()?; ctx.set_phy_payload()?;
@ -90,6 +92,22 @@ impl Multicast {
Ok(()) Ok(())
} }
async fn validate_expiration(&self) -> Result<()> {
trace!("Validating expires_at");
if let Some(expires_at) = self.multicast_group_queue_item.expires_at {
if Utc::now() > expires_at {
warn!(
expires_at = %expires_at,
"Discarding multicast-group queue item because it has expired"
);
multicast::delete_queue_item(&self.multicast_group_queue_item.id).await?;
return Err(anyhow!("Queue item has expired and has been discarded"));
}
}
Ok(())
}
async fn validate_payload_size(&self) -> Result<()> { async fn validate_payload_size(&self) -> Result<()> {
trace!("Validating payload size for DR"); trace!("Validating payload size for DR");
let mg = self.multicast_group.as_ref().unwrap(); let mg = self.multicast_group.as_ref().unwrap();

View File

@ -22,6 +22,7 @@ pub struct DeviceQueueItem {
pub f_cnt_down: Option<i64>, pub f_cnt_down: Option<i64>,
pub timeout_after: Option<DateTime<Utc>>, pub timeout_after: Option<DateTime<Utc>>,
pub is_encrypted: bool, pub is_encrypted: bool,
pub expires_at: Option<DateTime<Utc>>,
} }
impl DeviceQueueItem { impl DeviceQueueItem {
@ -57,6 +58,7 @@ impl Default for DeviceQueueItem {
f_cnt_down: None, f_cnt_down: None,
timeout_after: None, timeout_after: None,
is_encrypted: false, is_encrypted: false,
expires_at: None,
} }
} }
} }

View File

@ -98,6 +98,7 @@ pub struct MulticastGroupQueueItem {
pub f_port: i16, pub f_port: i16,
pub data: Vec<u8>, pub data: Vec<u8>,
pub emit_at_time_since_gps_epoch: Option<i64>, pub emit_at_time_since_gps_epoch: Option<i64>,
pub expires_at: Option<DateTime<Utc>>,
} }
impl MulticastGroupQueueItem { impl MulticastGroupQueueItem {
@ -126,6 +127,7 @@ impl Default for MulticastGroupQueueItem {
f_port: 0, f_port: 0,
data: vec![], data: vec![],
emit_at_time_since_gps_epoch: None, emit_at_time_since_gps_epoch: None,
expires_at: None,
} }
} }
} }
@ -471,6 +473,7 @@ pub async fn enqueue(
emit_at_time_since_gps_epoch: Some( emit_at_time_since_gps_epoch: Some(
emit_at_time_since_gps_epoch.num_milliseconds(), emit_at_time_since_gps_epoch.num_milliseconds(),
), ),
expires_at: qi.expires_at.clone(),
..Default::default() ..Default::default()
}; };
@ -543,6 +546,7 @@ pub async fn enqueue(
f_port: qi.f_port, f_port: qi.f_port,
data: qi.data.clone(), data: qi.data.clone(),
emit_at_time_since_gps_epoch, emit_at_time_since_gps_epoch,
expires_at: qi.expires_at.clone(),
..Default::default() ..Default::default()
}; };

View File

@ -203,6 +203,7 @@ diesel::table! {
f_cnt_down -> Nullable<Int8>, f_cnt_down -> Nullable<Int8>,
timeout_after -> Nullable<Timestamptz>, timeout_after -> Nullable<Timestamptz>,
is_encrypted -> Bool, is_encrypted -> Bool,
expires_at -> Nullable<Timestamptz>,
} }
} }
@ -277,6 +278,7 @@ diesel::table! {
f_port -> Int2, f_port -> Int2,
data -> Bytea, data -> Bytea,
emit_at_time_since_gps_epoch -> Nullable<Int8>, emit_at_time_since_gps_epoch -> Nullable<Int8>,
expires_at -> Nullable<Timestamptz>,
} }
} }

View File

@ -183,6 +183,7 @@ diesel::table! {
f_cnt_down -> Nullable<BigInt>, f_cnt_down -> Nullable<BigInt>,
timeout_after -> Nullable<TimestamptzSqlite>, timeout_after -> Nullable<TimestamptzSqlite>,
is_encrypted -> Bool, is_encrypted -> Bool,
expires_at -> Nullable<TimestamptzSqlite>,
} }
} }
@ -252,6 +253,7 @@ diesel::table! {
f_port -> SmallInt, f_port -> SmallInt,
data -> Binary, data -> Binary,
emit_at_time_since_gps_epoch -> Nullable<BigInt>, emit_at_time_since_gps_epoch -> Nullable<BigInt>,
expires_at -> Nullable<TimestamptzSqlite>,
} }
} }

View File

@ -1,4 +1,4 @@
use chrono::Utc; use chrono::{Duration, Utc};
use super::assert; use super::assert;
use crate::storage::{ use crate::storage::{
@ -118,7 +118,7 @@ async fn test_multicast() {
name: "one item in queue".into(), name: "one item in queue".into(),
multicast_group: mg.clone(), multicast_group: mg.clone(),
multicast_group_queue_items: vec![multicast::MulticastGroupQueueItem { multicast_group_queue_items: vec![multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id.into(), multicast_group_id: mg.id,
f_port: 5, f_port: 5,
data: vec![1, 2, 3], data: vec![1, 2, 3],
..Default::default() ..Default::default()
@ -160,13 +160,13 @@ async fn test_multicast() {
multicast_group: mg.clone(), multicast_group: mg.clone(),
multicast_group_queue_items: vec![ multicast_group_queue_items: vec![
multicast::MulticastGroupQueueItem { multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id.into(), multicast_group_id: mg.id,
f_port: 5, f_port: 5,
data: vec![1, 2, 3], data: vec![1, 2, 3],
..Default::default() ..Default::default()
}, },
multicast::MulticastGroupQueueItem { multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id.into(), multicast_group_id: mg.id,
f_port: 6, f_port: 6,
data: vec![1, 2, 3], data: vec![1, 2, 3],
..Default::default() ..Default::default()
@ -209,13 +209,13 @@ async fn test_multicast() {
multicast_group: mg.clone(), multicast_group: mg.clone(),
multicast_group_queue_items: vec![ multicast_group_queue_items: vec![
multicast::MulticastGroupQueueItem { multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id.into(), multicast_group_id: mg.id,
f_port: 5, f_port: 5,
data: vec![2; 300], data: vec![2; 300],
..Default::default() ..Default::default()
}, },
multicast::MulticastGroupQueueItem { multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id.into(), multicast_group_id: mg.id,
f_port: 6, f_port: 6,
data: vec![1, 2, 3], data: vec![1, 2, 3],
..Default::default() ..Default::default()
@ -223,6 +223,18 @@ async fn test_multicast() {
], ],
assert: vec![assert::no_downlink_frame()], assert: vec![assert::no_downlink_frame()],
}, },
MulticastTest {
name: "item discarded because it has expired".into(),
multicast_group: mg.clone(),
multicast_group_queue_items: vec![multicast::MulticastGroupQueueItem {
multicast_group_id: mg.id,
f_port: 5,
data: vec![1, 2, 3],
expires_at: Some(Utc::now() - Duration::seconds(10)),
..Default::default()
}],
assert: vec![assert::no_downlink_frame()],
},
]; ];
for tst in &tests { for tst in &tests {