Refactor device lock / scheduler_run_after field setting.

This removes the device lock that is stored in Redis to avoid Class-A
and Class-B / Class-C overlapping. Instead, it consistently sets the
scheduler_run_after timestamp, which indicates if the Class-B /
Class-C scheduler should consider scheduling downlinks for a particular
device.

This also updates the get_with_class_b_c_queue_items function to make
sure that multiple ChirpStack instances can execute the same query at
the same time, without getting the same set of data (as the device
records are locked for update and then updated with a
scheduler_run_after timestamp in the future).
This commit is contained in:
Orne Brocaar 2023-10-16 10:34:37 +01:00
parent fda489d315
commit d9a2eeba3c
5 changed files with 51 additions and 142 deletions

View File

@ -291,13 +291,14 @@ impl Data {
ctx.select_downlink_gateway()?;
if ctx._is_class_c() {
ctx.class_c_update_scheduler_run_after().await?;
ctx.check_for_first_uplink()?;
ctx.get_class_c_device_lock().await?;
ctx.set_immediately()?;
ctx.set_tx_info_for_rx2()?;
}
if ctx._is_class_b() {
ctx.set_tx_info_for_class_b_and_lock_device().await?;
ctx.set_tx_info_for_class_b_and_update_scheduler_run_after()
.await?;
}
if ctx._is_class_a() {
return Err(anyhow!("Invalid device-class"));
@ -925,20 +926,6 @@ impl Data {
Ok(())
}
async fn get_class_c_device_lock(&self) -> Result<()> {
trace!("Getting Class-C device lock");
let conf = config::get();
device::get_lock(
&self.device.dev_eui,
chrono::Duration::from_std(conf.network.scheduler.class_c_lock_duration)?,
)
.await
.context("Get device lock")?;
Ok(())
}
fn set_immediately(&mut self) -> Result<()> {
trace!("Setting immediately flag");
self.immediately = true;
@ -2324,7 +2311,22 @@ impl Data {
Ok(())
}
async fn set_tx_info_for_class_b_and_lock_device(&mut self) -> Result<()> {
async fn class_c_update_scheduler_run_after(&mut self) -> Result<()> {
trace!("Updating scheduler_run_after_ts for Class-C");
let conf = config::get();
let scheduler_run_after_ts = Utc::now() + conf.network.scheduler.class_c_lock_duration;
self.device =
device::set_scheduler_run_after(&self.device.dev_eui, Some(scheduler_run_after_ts))
.await?;
Ok(())
}
// The setting of tx_info and updating update_scheduler_run_after_ts is combined
// as we need to calculate the ping_slot_ts for the tx_info.
async fn set_tx_info_for_class_b_and_update_scheduler_run_after(&mut self) -> Result<()> {
trace!("Setting tx-info for Class-B");
let gw_down = self.downlink_gateway.as_ref().unwrap();
@ -2365,13 +2367,8 @@ impl Data {
})),
});
let scheduler_run_after_ts = ping_slot_ts.to_date_time();
// Try to aquire the device lock.
device::get_lock(&self.device.dev_eui, scheduler_run_after_ts - Utc::now())
.await
.context("Get device lock")?;
// Update the device next scheduler run.
let scheduler_run_after_ts = ping_slot_ts.to_date_time();
trace!(scheduler_run_after = %scheduler_run_after_ts, "Setting scheduler_run_after for device");
self.device =
device::set_scheduler_run_after(&self.device.dev_eui, Some(scheduler_run_after_ts))

View File

@ -13,7 +13,7 @@ use uuid::Uuid;
use lrwn::{DevAddr, EUI64};
use super::schema::{application, device, device_profile, multicast_group_device, tenant};
use super::{error::Error, fields, get_db_conn, get_redis_conn, redis_key};
use super::{error::Error, fields, get_db_conn};
use crate::config;
#[derive(Debug, Clone, Copy, Eq, PartialEq, AsExpression, FromSqlRow)]
@ -539,6 +539,22 @@ pub async fn get_with_class_b_c_queue_items(limit: usize) -> Result<Vec<Device>>
let mut c = get_db_conn()?;
c.transaction::<Vec<Device>, Error, _>(|c| {
let conf = config::get();
// This query will:
// * Select the devices for which a Class-B or Class-C downlink can be scheduled.
// * Lock the device records for update with skip locked such that other
// ChirpStack instances are able to do the same for the remaining devices.
// * Update the scheduler_run_after for these devices to now() + 2 * scheduler
// interval to avoid concurrency issues (other ChirpStack instance scheduling
// the same queue items).
//
// This way, we do not have to keep the device records locked until the scheduler
// finishes its batch as the same set of devices will not be returned until after
// the updated scheduler_run_after. Only if the scheduler takes more time than 2x the
// interval (the scheduler is still working on processing the batch after 2 x interval)
// this might cause issues.
// The alternative would be to keep the transaction open for a long time + keep
// the device records locked during this time which could case issues as well.
diesel::sql_query(
r#"
update
@ -568,6 +584,7 @@ pub async fn get_with_class_b_c_queue_items(limit: usize) -> Result<Vec<Device>>
)
order by d.dev_eui
limit $1
for update skip locked
)
returning *
"#,
@ -585,55 +602,6 @@ pub async fn get_with_class_b_c_queue_items(limit: usize) -> Result<Vec<Device>>
.await?
}
// This sets the lock. In case a lock was already set, it will be overwritten with the new TTL
// value.
pub async fn set_lock(dev_eui: &EUI64, ttl: Duration) -> Result<()> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<()> {
info!(dev_eui = %dev_eui, "Setting device lock");
let key = redis_key(format!("device:{{{}}}:lock", dev_eui));
let mut c = get_redis_conn()?;
redis::cmd("PSETEX")
.arg(key)
.arg(ttl.num_milliseconds())
.arg("lock")
.query(&mut *c)?;
Ok(())
}
})
.await?
}
// This sets the lock. In case a lock was already set, this function will return an error.
pub async fn get_lock(dev_eui: &EUI64, ttl: Duration) -> Result<(), Error> {
task::spawn_blocking({
let dev_eui = *dev_eui;
move || -> Result<(), Error> {
info!(dev_eui = %dev_eui, "Aquiring device lock");
let key = redis_key(format!("device:{{{}}}:lock", dev_eui));
let mut c = get_redis_conn()?;
let set: bool = redis::cmd("SET")
.arg(&key)
.arg("lock")
.arg("PX")
.arg(ttl.num_milliseconds() as usize)
.arg("NX")
.query(&mut *c)?;
if !set {
return Err(Error::AlreadyExists(key));
}
Ok(())
}
})
.await?
}
#[cfg(test)]
pub mod test {
use super::*;
@ -830,45 +798,4 @@ pub mod test {
let res = get_with_class_b_c_queue_items(10).await.unwrap();
assert_eq!(1, res.len());
}
#[tokio::test]
async fn test_get_set_lock() {
let _guard = test::prepare().await;
// This is okay, as we are overwriting the lock
set_lock(
&EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 1]),
Duration::seconds(1),
)
.await
.unwrap();
set_lock(
&EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 1]),
Duration::seconds(1),
)
.await
.unwrap();
// This should fail as we are trying to aquire a lock,
// but there is already a lock set.
let res = get_lock(
&EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 1]),
Duration::seconds(1),
)
.await;
assert!(res.is_err());
get_lock(
&EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 2]),
Duration::seconds(1),
)
.await
.unwrap();
let res = get_lock(
&EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 2]),
Duration::seconds(1),
)
.await;
assert!(res.is_err());
}
}

View File

@ -464,13 +464,12 @@ pub fn device_uplink_frame_log(uf: api::UplinkFrameLog) -> Validator {
})
}
pub fn downlink_device_lock(dev_eui: EUI64) -> Validator {
pub fn scheduler_run_after_set(dev_eui: EUI64) -> Validator {
Box::new(move || {
let dev_eui = dev_eui.clone();
Box::pin(async move {
let mut c = get_redis_conn().unwrap();
let key = redis_key(format!("device:{{{}}}:lock", dev_eui));
let _: String = redis::cmd("GET").arg(key).query(&mut *c).unwrap();
let d = device::get(&dev_eui).await.unwrap();
assert!(d.scheduler_run_after.is_some());
})
})
}

View File

@ -1603,22 +1603,22 @@ async fn test_lorawan_10_uplink() {
],
},
Test {
name: "uplink of class-c device sets lock".into(),
name: "uplink of class-c device updates scheduler_run_after".into(),
device_queue_items: vec![],
before_func: Some(Box::new(move || {
let dp_id = dp.id.clone();
let dev_eui = dev.dev_eui;
Box::pin(async move {
let mut dp = device_profile::get(&dp_id).await.unwrap();
dp.supports_class_c = true;
device_profile::update(dp.clone()).await.unwrap();
device::set_enabled_class(&dev_eui, device::DeviceClass::C)
.await
.unwrap();
})
})),
after_func: Some(Box::new(move || {
let dp_id = dp.id.clone();
let dev_eui = dev.dev_eui;
Box::pin(async move {
let mut dp = device_profile::get(&dp_id).await.unwrap();
dp.supports_class_c = false;
device_profile::update(dp).await.unwrap();
device::set_enabled_class(&dev_eui, device::DeviceClass::A)
.await
.unwrap();
})
})),
device_session: Some(ds.clone()),
@ -1643,7 +1643,7 @@ async fn test_lorawan_10_uplink() {
assert: vec![
assert::f_cnt_up(dev.dev_eui.clone(), 11),
assert::n_f_cnt_down(dev.dev_eui.clone(), 5),
assert::downlink_device_lock(dev.dev_eui.clone()),
assert::scheduler_run_after_set(dev.dev_eui.clone()),
],
},
];

View File

@ -118,7 +118,6 @@ impl Data {
ctx.set_device_info()?;
ctx.set_device_gateway_rx_info()?;
ctx.handle_retransmission_reset().await?;
ctx.set_device_lock().await?;
ctx.set_scheduler_run_after().await?;
if !ctx._is_roaming() {
// In case of roaming we do not know the gateways and therefore it must not be
@ -190,7 +189,6 @@ impl Data {
ctx.set_device_info()?;
ctx.set_relay_rx_info()?;
ctx.handle_retransmission_reset().await?;
ctx.set_device_lock().await?;
ctx.decrypt_f_opts_mac_commands()?;
ctx.decrypt_frm_payload()?;
ctx.set_adr()?;
@ -539,18 +537,6 @@ impl Data {
Err(Error::Abort)
}
async fn set_device_lock(&self) -> Result<()> {
trace!("Setting device lock");
let dev = self.device.as_ref().unwrap();
let conf = config::get();
device::set_lock(
&dev.dev_eui,
Duration::from_std(conf.network.scheduler.class_a_lock_duration)?,
)
.await
}
// For Class-B and Class-C devices, set the scheduler_run_after timestamp to avoid collisions with
// the Class-A downlink and Class-B/C scheduler.
async fn set_scheduler_run_after(&mut self) -> Result<()> {