Update multicast function to return expired queue items.

In case a gateway is offline, associated queue-items would be excluded
by the get_schedulable_queue_items function. With this change when the
queue item has expired it will be returned even if the gateway is
offline. This way, the expired queue item will be deleted by the
multicast flow.
This commit is contained in:
Orne Brocaar 2024-12-05 11:05:21 +00:00
parent 532392abe1
commit 91b9077ba8

View File

@ -670,7 +670,8 @@ pub async fn get_schedulable_queue_items(limit: usize) -> Result<Vec<MulticastGr
on g.gateway_id = qi.gateway_id
where
qi.scheduler_run_after <= $2
and now() - make_interval(secs => g.stats_interval_secs * 2) <= g.last_seen_at
-- check that the gateway is online, except when the item already has expired
and ($2 - make_interval(secs => g.stats_interval_secs * 2) <= g.last_seen_at or expires_at <= $2)
order by
qi.created_at
limit $1
@ -1122,4 +1123,111 @@ pub mod test {
flush_queue(&mg.id.into()).await.unwrap();
assert!(delete_queue_item(&ids[0]).await.is_err());
}
#[tokio::test]
async fn test_get_schedulable_queue_items() {
let _guard = test::prepare().await;
let t = tenant::create(tenant::Tenant {
name: "test-tenant".into(),
can_have_gateways: true,
..Default::default()
})
.await
.unwrap();
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id,
..Default::default()
})
.await
.unwrap();
let gw = gateway::create(gateway::Gateway {
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
name: "test-gw".into(),
tenant_id: t.id,
stats_interval_secs: 30,
last_seen_at: Some(Utc::now()),
..Default::default()
})
.await
.unwrap();
let mg = create(MulticastGroup {
application_id: app.id,
name: "test-mg".into(),
region: CommonName::EU868,
mc_addr: DevAddr::from_be_bytes([1, 2, 3, 4]),
mc_nwk_s_key: AES128Key::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8]),
f_cnt: 10,
group_type: "C".into(),
dr: 1,
frequency: 868100000,
class_c_scheduling_type: fields::MulticastGroupSchedulingType::DELAY,
..Default::default()
})
.await
.unwrap();
let mut qi = MulticastGroupQueueItem {
scheduler_run_after: Utc::now(),
multicast_group_id: mg.id,
gateway_id: gw.gateway_id,
f_cnt: mg.f_cnt,
f_port: 10,
data: vec![1, 2, 3],
expires_at: None,
..Default::default()
};
qi = diesel::insert_into(multicast_group_queue_item::table)
.values(&qi)
.get_result(&mut get_async_db_conn().await.unwrap())
.await
.unwrap();
// We expect one queue item.
let out = get_schedulable_queue_items(100).await.unwrap();
assert_eq!(1, out.len());
// We expect zero items because the scheduler_run_after has been updated
// by the get_schedulable_queue_items function.
let out = get_schedulable_queue_items(100).await.unwrap();
assert_eq!(0, out.len());
// Restore scheduler_run_after
diesel::update(multicast_group_queue_item::dsl::multicast_group_queue_item.find(&qi.id))
.set(multicast_group_queue_item::scheduler_run_after.eq(Utc::now()))
.execute(&mut get_async_db_conn().await.unwrap())
.await
.unwrap();
// Set gateway last_seen_at in the past.
gateway::partial_update(
gw.gateway_id,
&gateway::GatewayChangeset {
last_seen_at: Some(Some(Utc::now() - Duration::days(1))),
..Default::default()
},
)
.await
.unwrap();
// We expect zero items, as the gateway is not online.
let out = get_schedulable_queue_items(100).await.unwrap();
assert_eq!(0, out.len());
// Set the expires_at of the queue item to now.
diesel::update(multicast_group_queue_item::dsl::multicast_group_queue_item.find(&qi.id))
.set(multicast_group_queue_item::expires_at.eq(Some(Utc::now())))
.execute(&mut get_async_db_conn().await.unwrap())
.await
.unwrap();
// We expect one item, as it has expired.
let out = get_schedulable_queue_items(100).await.unwrap();
assert_eq!(1, out.len());
}
}