Get all device data in one query.

Instead of querying the device, device-profile, tenant and application
in separate queries.
This commit is contained in:
Orne Brocaar 2023-11-15 12:49:13 +00:00
parent f81b868a31
commit f7cd5a5a58
6 changed files with 81 additions and 102 deletions
chirpstack/src

@ -16,8 +16,9 @@ use crate::storage;
use crate::storage::{
application,
device::{self, DeviceClass},
device_gateway, device_profile, device_queue, device_session, downlink_frame, mac_command,
relay, tenant,
device_gateway, device_profile, device_queue, device_session, downlink_frame,
helpers::get_all_device_data,
mac_command, relay, tenant,
};
use crate::uplink::{RelayContext, UplinkFrameSet};
use crate::{adr, config, gateway, integration, maccommand, region, sensitivity};
@ -294,9 +295,7 @@ impl Data {
async fn _handle_schedule_next_queue_item(downlink_id: u32, dev: device::Device) -> Result<()> {
trace!("Handle schedule next-queue item flow");
let dp = device_profile::get(&dev.device_profile_id).await?;
let app = application::get(&dev.application_id).await?;
let ten = tenant::get(&app.tenant_id).await?;
let (_, app, ten, dp) = get_all_device_data(dev.dev_eui).await?;
let ds = device_session::get(&dev.dev_eui).await?;
let rc = region::get(&ds.region_config_id)?;
let rn = config::get_region_network(&ds.region_config_id)?;

@ -0,0 +1,28 @@
use diesel::prelude::*;
use tokio::task;
use super::schema::{application, device, device_profile, tenant};
use super::{
application::Application, device::Device, device_profile::DeviceProfile, tenant::Tenant,
};
use super::{error::Error, get_db_conn};
use lrwn::EUI64;
pub async fn get_all_device_data(
dev_eui: EUI64,
) -> Result<(Device, Application, Tenant, DeviceProfile), Error> {
task::spawn_blocking({
move || -> Result<(Device, Application, Tenant, DeviceProfile), Error> {
let mut c = get_db_conn()?;
let res = device::table
.inner_join(application::table)
.inner_join(tenant::table.on(application::dsl::tenant_id.eq(tenant::dsl::id)))
.inner_join(device_profile::table)
.filter(device::dsl::dev_eui.eq(&dev_eui))
.first::<(Device, Application, Tenant, DeviceProfile)>(&mut c)
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(res)
}
})
.await?
}

@ -32,6 +32,7 @@ pub mod schema;
pub mod search;
pub mod tenant;
pub mod user;
pub mod helpers;
pub type PgPool = Pool<ConnectionManager<PgConnection>>;
pub type PgPoolConnection = PooledConnection<ConnectionManager<PgConnection>>;

@ -17,7 +17,9 @@ use crate::storage::error::Error as StorageError;
use crate::storage::{
application,
device::{self, DeviceClass},
device_gateway, device_profile, device_queue, device_session, fields, metrics, tenant,
device_gateway, device_profile, device_queue, device_session, fields,
helpers::get_all_device_data,
metrics, tenant,
};
use crate::{codec, config, downlink, integration, maccommand, region, stream};
use chirpstack_api::{integration as integration_pb, internal, stream as stream_pb};
@ -111,16 +113,12 @@ impl Data {
ctx.handle_passive_roaming_device().await?;
ctx.get_device_session().await?;
ctx.get_device().await?;
ctx.get_device_data().await?;
// Add dev_eui to span
let span = tracing::Span::current();
span.record("dev_eui", ctx.device.as_ref().unwrap().dev_eui.to_string());
ctx.get_device_profile().await?;
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.abort_on_device_is_disabled().await?;
ctx.set_device_info()?;
ctx.set_device_gateway_rx_info()?;
@ -188,10 +186,7 @@ impl Data {
};
ctx.get_device_session_relayed().await?;
ctx.get_device().await?;
ctx.get_device_profile().await?;
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.get_device_data().await?;
ctx.abort_on_device_is_disabled().await?;
ctx.set_device_info()?;
ctx.set_relay_rx_info()?;
@ -355,37 +350,23 @@ impl Data {
Ok(())
}
async fn get_device(&mut self) -> Result<()> {
trace!("Getting device");
async fn get_device_data(&mut self) -> Result<()> {
trace!("Getting device data");
let dev_eui = lrwn::EUI64::from_slice(&self.device_session.as_ref().unwrap().dev_eui)?;
self.device = Some(device::get(&dev_eui).await?);
Ok(())
}
let (dev, app, t, dp) = get_all_device_data(dev_eui).await?;
async fn get_device_profile(&mut self) -> Result<()> {
trace!("Getting the device-profile");
let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?;
if dp.region != self.uplink_frame_set.region_common_name {
return Err(anyhow!("Invalid device-profile region"));
}
self.tenant = Some(t);
self.application = Some(app);
self.device_profile = Some(dp);
self.device = Some(dev);
Ok(())
}
async fn get_application(&mut self) -> Result<()> {
trace!("Getting application");
self.application =
Some(application::get(&self.device.as_ref().unwrap().application_id).await?);
Ok(())
}
async fn get_tenant(&mut self) -> Result<()> {
trace!("Getting tenant");
self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?);
Ok(())
}
fn set_device_info(&mut self) -> Result<()> {
trace!("Setting device-info");

@ -26,6 +26,7 @@ use crate::storage::{
device::{self, DeviceClass},
device_keys, device_profile, device_queue,
error::Error as StorageError,
helpers::get_all_device_data,
metrics, tenant,
};
use crate::{config, devaddr::get_random_dev_addr, downlink, integration, region, stream};
@ -120,11 +121,8 @@ impl JoinRequest {
ctx.join_request.as_ref().unwrap().dev_eui.to_string(),
);
ctx.get_device_or_try_pr_roaming().await?;
ctx.get_device_data_or_try_pr_roaming().await?;
ctx.get_device_keys_or_js_client().await?; // used to validate MIC + if we need external JS
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.get_device_profile().await?;
ctx.set_device_info()?;
ctx.filter_rx_info_by_tenant()?;
ctx.filter_rx_info_by_region_config_id()?;
@ -178,11 +176,8 @@ impl JoinRequest {
};
ctx.get_join_request_payload_relayed()?;
ctx.get_device().await?;
ctx.get_device_data().await?;
ctx.get_device_keys_or_js_client().await?;
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.get_device_profile().await?;
ctx.set_device_info()?;
ctx.set_relay_rx_info()?;
ctx.abort_on_device_is_disabled()?;
@ -237,11 +232,21 @@ impl JoinRequest {
Ok(())
}
async fn get_device(&mut self) -> Result<()> {
trace!("Getting device");
async fn get_device_data(&mut self) -> Result<()> {
trace!("Getting device data");
let jr = self.join_request.as_ref().unwrap();
let dev = device::get(&jr.dev_eui).await?;
let (dev, app, t, dp) = get_all_device_data(jr.dev_eui).await?;
if dp.region != self.uplink_frame_set.region_common_name {
return Err(anyhow!("Invalid device-profile region"));
}
self.tenant = Some(t);
self.application = Some(app);
self.device_profile = Some(dp);
self.device = Some(dev);
Ok(())
}
@ -268,10 +273,10 @@ impl JoinRequest {
Ok(())
}
async fn get_device_or_try_pr_roaming(&mut self) -> Result<()> {
async fn get_device_data_or_try_pr_roaming(&mut self) -> Result<()> {
trace!("Getting device");
let jr = self.join_request.as_ref().unwrap();
let dev = match device::get(&jr.dev_eui).await {
let (dev, app, t, dp) = match get_all_device_data(jr.dev_eui).await {
Ok(v) => v,
Err(e) => {
if let StorageError::NotFound(_) = e {
@ -289,32 +294,15 @@ impl JoinRequest {
}
};
self.device = Some(dev);
Ok(())
}
async fn get_application(&mut self) -> Result<()> {
trace!("Getting application");
self.application =
Some(application::get(&self.device.as_ref().unwrap().application_id).await?);
Ok(())
}
async fn get_tenant(&mut self) -> Result<()> {
trace!("Getting tenant");
self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?);
Ok(())
}
async fn get_device_profile(&mut self) -> Result<()> {
trace!("Getting device-profile");
let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?;
if dp.region != self.uplink_frame_set.region_common_name {
return Err(anyhow!("Invalid device-profile region"));
}
self.tenant = Some(t);
self.application = Some(app);
self.device_profile = Some(dp);
self.device = Some(dev);
Ok(())
}

@ -12,6 +12,7 @@ use crate::storage::{
device::{self, DeviceClass},
device_keys, device_profile, device_queue, device_session,
error::Error as StorageError,
helpers::get_all_device_data,
metrics, tenant,
};
use crate::{config, devaddr::get_random_dev_addr, integration, region, stream};
@ -81,11 +82,8 @@ impl JoinRequest {
};
ctx.get_join_request_payload()?;
ctx.get_device().await?;
ctx.get_device_data().await?;
ctx.get_device_keys_or_js_client().await?;
ctx.get_application().await?;
ctx.get_tenant().await?;
ctx.get_device_profile().await?;
ctx.set_device_info()?;
ctx.abort_on_device_is_disabled()?;
ctx.abort_on_otaa_is_disabled()?;
@ -122,11 +120,20 @@ impl JoinRequest {
Ok(())
}
async fn get_device(&mut self) -> Result<()> {
trace!("Getting device");
async fn get_device_data(&mut self) -> Result<()> {
trace!("Getting device data");
let jr = self.join_request.as_ref().unwrap();
let dev = device::get(&jr.dev_eui).await?;
let (dev, app, t, dp) = get_all_device_data(jr.dev_eui).await?;
if dp.region != self.uplink_frame_set.region_common_name {
return Err(anyhow!("Invalid device-profile region"));
}
self.tenant = Some(t);
self.application = Some(app);
self.device_profile = Some(dp);
self.device = Some(dev);
Ok(())
}
@ -153,31 +160,6 @@ impl JoinRequest {
Ok(())
}
async fn get_application(&mut self) -> Result<()> {
trace!("Getting application");
self.application =
Some(application::get(&self.device.as_ref().unwrap().application_id).await?);
Ok(())
}
async fn get_tenant(&mut self) -> Result<()> {
trace!("Getting tenant");
self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?);
Ok(())
}
async fn get_device_profile(&mut self) -> Result<()> {
trace!("Getting device-profile");
let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?;
if dp.region != self.uplink_frame_set.region_common_name {
return Err(anyhow!("Invalid device-profile region"));
}
self.device_profile = Some(dp);
Ok(())
}
fn set_device_info(&mut self) -> Result<()> {
let tenant = self.tenant.as_ref().unwrap();
let app = self.application.as_ref().unwrap();