From 91b9077ba8266978ef882bdb988e5222f41115d3 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 5 Dec 2024 11:05:21 +0000 Subject: [PATCH] Update multicast function to return expired queue items. In case a gateway is offline, associated queue-items would be excluded by the get_schedulable_queue_items function. With this change when the queue item has expired it will be returned even if the gateway is offline. This way, the expired queue item will be deleted by the multicast flow. --- chirpstack/src/storage/multicast.rs | 110 +++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/chirpstack/src/storage/multicast.rs b/chirpstack/src/storage/multicast.rs index 9e4c8125..09527f3f 100644 --- a/chirpstack/src/storage/multicast.rs +++ b/chirpstack/src/storage/multicast.rs @@ -670,7 +670,8 @@ pub async fn get_schedulable_queue_items(limit: usize) -> Result g.stats_interval_secs * 2) <= g.last_seen_at + -- check that the gateway is online, except when the item already has expired + and ($2 - make_interval(secs => g.stats_interval_secs * 2) <= g.last_seen_at or expires_at <= $2) order by qi.created_at limit $1 @@ -1122,4 +1123,111 @@ pub mod test { flush_queue(&mg.id.into()).await.unwrap(); assert!(delete_queue_item(&ids[0]).await.is_err()); } + + #[tokio::test] + async fn test_get_schedulable_queue_items() { + let _guard = test::prepare().await; + + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + can_have_gateways: true, + ..Default::default() + }) + .await + .unwrap(); + + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let gw = gateway::create(gateway::Gateway { + gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + name: "test-gw".into(), + tenant_id: t.id, + stats_interval_secs: 30, + last_seen_at: Some(Utc::now()), + ..Default::default() + }) + .await + .unwrap(); + + let mg = create(MulticastGroup { + application_id: app.id, + name: "test-mg".into(), + region: CommonName::EU868, + mc_addr: DevAddr::from_be_bytes([1, 2, 3, 4]), + mc_nwk_s_key: AES128Key::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8]), + f_cnt: 10, + group_type: "C".into(), + dr: 1, + frequency: 868100000, + class_c_scheduling_type: fields::MulticastGroupSchedulingType::DELAY, + ..Default::default() + }) + .await + .unwrap(); + + let mut qi = MulticastGroupQueueItem { + scheduler_run_after: Utc::now(), + multicast_group_id: mg.id, + gateway_id: gw.gateway_id, + f_cnt: mg.f_cnt, + f_port: 10, + data: vec![1, 2, 3], + expires_at: None, + ..Default::default() + }; + + qi = diesel::insert_into(multicast_group_queue_item::table) + .values(&qi) + .get_result(&mut get_async_db_conn().await.unwrap()) + .await + .unwrap(); + + // We expect one queue item. + let out = get_schedulable_queue_items(100).await.unwrap(); + assert_eq!(1, out.len()); + + // We expect zero items because the scheduler_run_after has been updated + // by the get_schedulable_queue_items function. + let out = get_schedulable_queue_items(100).await.unwrap(); + assert_eq!(0, out.len()); + + // Restore scheduler_run_after + diesel::update(multicast_group_queue_item::dsl::multicast_group_queue_item.find(&qi.id)) + .set(multicast_group_queue_item::scheduler_run_after.eq(Utc::now())) + .execute(&mut get_async_db_conn().await.unwrap()) + .await + .unwrap(); + + // Set gateway last_seen_at in the past. + gateway::partial_update( + gw.gateway_id, + &gateway::GatewayChangeset { + last_seen_at: Some(Some(Utc::now() - Duration::days(1))), + ..Default::default() + }, + ) + .await + .unwrap(); + + // We expect zero items, as the gateway is not online. + let out = get_schedulable_queue_items(100).await.unwrap(); + assert_eq!(0, out.len()); + + // Set the expires_at of the queue item to now. + diesel::update(multicast_group_queue_item::dsl::multicast_group_queue_item.find(&qi.id)) + .set(multicast_group_queue_item::expires_at.eq(Some(Utc::now()))) + .execute(&mut get_async_db_conn().await.unwrap()) + .await + .unwrap(); + + // We expect one item, as it has expired. + let out = get_schedulable_queue_items(100).await.unwrap(); + assert_eq!(1, out.len()); + } }