mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-01-20 11:28:46 +00:00
Cleanup DevEUI from DevAddr > DevEUI set, if DS does not exist.
This commit is contained in:
parent
0fa40717e8
commit
4dd441e85d
@ -4,7 +4,7 @@ use std::io::Cursor;
|
||||
use anyhow::{Context, Result};
|
||||
use prost::Message;
|
||||
use tokio::task;
|
||||
use tracing::{info, trace, warn};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
use super::error::Error;
|
||||
use super::{get_redis_conn, redis_key};
|
||||
@ -324,16 +324,40 @@ async fn get_dev_euis_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<EUI64>> {
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn remove_dev_eui_from_dev_addr_set(dev_addr: DevAddr, dev_eui: EUI64) -> Result<()> {
|
||||
task::spawn_blocking({
|
||||
let dev_addr = dev_addr;
|
||||
let dev_eui = dev_eui;
|
||||
move || -> Result<()> {
|
||||
let key = redis_key(format!("devaddr:{{{}}}", dev_addr));
|
||||
let mut c = get_redis_conn()?;
|
||||
redis::cmd("SREM")
|
||||
.arg(key)
|
||||
.arg(&dev_eui.to_be_bytes())
|
||||
.query(&mut *c)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn get_for_dev_addr(dev_addr: DevAddr) -> Result<Vec<internal::DeviceSession>> {
|
||||
trace!(dev_addr = %dev_addr, "Getting device-session for DevAddr");
|
||||
let dev_euis = get_dev_euis_for_dev_addr(dev_addr).await?;
|
||||
|
||||
let mut out = Vec::new();
|
||||
for dev_eui in &dev_euis {
|
||||
let ds = match get(dev_eui).await {
|
||||
for dev_eui in dev_euis {
|
||||
let ds = match get(&dev_eui).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(dev_addr = %dev_addr, dev_eui = %dev_eui, error = %e, "Get device-session for DevAddr error");
|
||||
if let Error::NotFound(_) = e {
|
||||
if let Err(e) = remove_dev_eui_from_dev_addr_set(dev_addr, dev_eui).await {
|
||||
error!(dev_addr = %dev_addr, dev_eui = %dev_eui, error = %e, "Remove DevEUI from DevAddr->DevEUI set error");
|
||||
}
|
||||
} else {
|
||||
error!(dev_addr = %dev_addr, dev_eui = %dev_eui, error = %e, "Get device-session for DevEUI error");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@ -667,4 +691,49 @@ pub mod test {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_for_dev_addr() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
let dev_eui_1 = EUI64::from_be_bytes([1, 1, 1, 1, 1, 1, 1, 1]);
|
||||
let dev_eui_2 = EUI64::from_be_bytes([2, 2, 2, 2, 2, 2, 2, 2]);
|
||||
let dev_addr = DevAddr::from_be_bytes([1, 2, 3, 4]);
|
||||
|
||||
let ds_1 = internal::DeviceSession {
|
||||
dev_addr: dev_addr.to_vec(),
|
||||
dev_eui: dev_eui_1.to_vec(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let ds_2 = internal::DeviceSession {
|
||||
dev_addr: dev_addr.to_vec(),
|
||||
dev_eui: dev_eui_2.to_vec(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
save(&ds_1).await.unwrap();
|
||||
save(&ds_2).await.unwrap();
|
||||
|
||||
let dss = get_for_dev_addr(dev_addr).await.unwrap();
|
||||
assert_eq!(2, dss.len());
|
||||
|
||||
let dev_euis = get_dev_euis_for_dev_addr(dev_addr).await.unwrap();
|
||||
assert_eq!(2, dev_euis.len());
|
||||
|
||||
// At this point there is still a 'dangling' pointer from DevAddr->DevEUI.
|
||||
delete(&dev_eui_2).await.unwrap();
|
||||
let dev_euis = get_dev_euis_for_dev_addr(dev_addr).await.unwrap();
|
||||
assert_eq!(2, dev_euis.len());
|
||||
|
||||
// This should only return one device-session.
|
||||
let dss = get_for_dev_addr(dev_addr).await.unwrap();
|
||||
assert_eq!(1, dss.len());
|
||||
assert_eq!(dev_eui_1.to_vec(), dss[0].dev_eui);
|
||||
|
||||
// 'dangling' DevAddr->DevEUI pointers have been cleaned up.
|
||||
let dev_euis = get_dev_euis_for_dev_addr(dev_addr).await.unwrap();
|
||||
assert_eq!(1, dev_euis.len());
|
||||
assert_eq!(dev_eui_1, dev_euis[0]);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user