diff --git a/chirpstack/src/downlink/data.rs b/chirpstack/src/downlink/data.rs index 40bac23f..8bb55513 100644 --- a/chirpstack/src/downlink/data.rs +++ b/chirpstack/src/downlink/data.rs @@ -16,8 +16,9 @@ use crate::storage; use crate::storage::{ application, device::{self, DeviceClass}, - device_gateway, device_profile, device_queue, device_session, downlink_frame, mac_command, - relay, tenant, + device_gateway, device_profile, device_queue, device_session, downlink_frame, + helpers::get_all_device_data, + mac_command, relay, tenant, }; use crate::uplink::{RelayContext, UplinkFrameSet}; use crate::{adr, config, gateway, integration, maccommand, region, sensitivity}; @@ -294,9 +295,7 @@ impl Data { async fn _handle_schedule_next_queue_item(downlink_id: u32, dev: device::Device) -> Result<()> { trace!("Handle schedule next-queue item flow"); - let dp = device_profile::get(&dev.device_profile_id).await?; - let app = application::get(&dev.application_id).await?; - let ten = tenant::get(&app.tenant_id).await?; + let (_, app, ten, dp) = get_all_device_data(dev.dev_eui).await?; let ds = device_session::get(&dev.dev_eui).await?; let rc = region::get(&ds.region_config_id)?; let rn = config::get_region_network(&ds.region_config_id)?; diff --git a/chirpstack/src/storage/helpers.rs b/chirpstack/src/storage/helpers.rs new file mode 100644 index 00000000..0cf9817a --- /dev/null +++ b/chirpstack/src/storage/helpers.rs @@ -0,0 +1,28 @@ +use diesel::prelude::*; +use tokio::task; + +use super::schema::{application, device, device_profile, tenant}; +use super::{ + application::Application, device::Device, device_profile::DeviceProfile, tenant::Tenant, +}; +use super::{error::Error, get_db_conn}; +use lrwn::EUI64; + +pub async fn get_all_device_data( + dev_eui: EUI64, +) -> Result<(Device, Application, Tenant, DeviceProfile), Error> { + task::spawn_blocking({ + move || -> Result<(Device, Application, Tenant, DeviceProfile), Error> { + let mut c = get_db_conn()?; + let res = device::table + .inner_join(application::table) + .inner_join(tenant::table.on(application::dsl::tenant_id.eq(tenant::dsl::id))) + .inner_join(device_profile::table) + .filter(device::dsl::dev_eui.eq(&dev_eui)) + .first::<(Device, Application, Tenant, DeviceProfile)>(&mut c) + .map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?; + Ok(res) + } + }) + .await? +} diff --git a/chirpstack/src/storage/mod.rs b/chirpstack/src/storage/mod.rs index 930c38ab..c060cf81 100644 --- a/chirpstack/src/storage/mod.rs +++ b/chirpstack/src/storage/mod.rs @@ -32,6 +32,7 @@ pub mod schema; pub mod search; pub mod tenant; pub mod user; +pub mod helpers; pub type PgPool = Pool>; pub type PgPoolConnection = PooledConnection>; diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index e177f7bd..2a58f600 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -17,7 +17,9 @@ use crate::storage::error::Error as StorageError; use crate::storage::{ application, device::{self, DeviceClass}, - device_gateway, device_profile, device_queue, device_session, fields, metrics, tenant, + device_gateway, device_profile, device_queue, device_session, fields, + helpers::get_all_device_data, + metrics, tenant, }; use crate::{codec, config, downlink, integration, maccommand, region, stream}; use chirpstack_api::{integration as integration_pb, internal, stream as stream_pb}; @@ -111,16 +113,12 @@ impl Data { ctx.handle_passive_roaming_device().await?; ctx.get_device_session().await?; - - ctx.get_device().await?; + ctx.get_device_data().await?; // Add dev_eui to span let span = tracing::Span::current(); span.record("dev_eui", ctx.device.as_ref().unwrap().dev_eui.to_string()); - ctx.get_device_profile().await?; - ctx.get_application().await?; - ctx.get_tenant().await?; ctx.abort_on_device_is_disabled().await?; ctx.set_device_info()?; ctx.set_device_gateway_rx_info()?; @@ -188,10 +186,7 @@ impl Data { }; ctx.get_device_session_relayed().await?; - ctx.get_device().await?; - ctx.get_device_profile().await?; - ctx.get_application().await?; - ctx.get_tenant().await?; + ctx.get_device_data().await?; ctx.abort_on_device_is_disabled().await?; ctx.set_device_info()?; ctx.set_relay_rx_info()?; @@ -355,37 +350,23 @@ impl Data { Ok(()) } - async fn get_device(&mut self) -> Result<()> { - trace!("Getting device"); + async fn get_device_data(&mut self) -> Result<()> { + trace!("Getting device data"); let dev_eui = lrwn::EUI64::from_slice(&self.device_session.as_ref().unwrap().dev_eui)?; - self.device = Some(device::get(&dev_eui).await?); - Ok(()) - } + let (dev, app, t, dp) = get_all_device_data(dev_eui).await?; - async fn get_device_profile(&mut self) -> Result<()> { - trace!("Getting the device-profile"); - let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?; if dp.region != self.uplink_frame_set.region_common_name { return Err(anyhow!("Invalid device-profile region")); } + + self.tenant = Some(t); + self.application = Some(app); self.device_profile = Some(dp); + self.device = Some(dev); Ok(()) } - async fn get_application(&mut self) -> Result<()> { - trace!("Getting application"); - self.application = - Some(application::get(&self.device.as_ref().unwrap().application_id).await?); - Ok(()) - } - - async fn get_tenant(&mut self) -> Result<()> { - trace!("Getting tenant"); - self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?); - Ok(()) - } - fn set_device_info(&mut self) -> Result<()> { trace!("Setting device-info"); diff --git a/chirpstack/src/uplink/join.rs b/chirpstack/src/uplink/join.rs index fd99ddda..9e306d7c 100644 --- a/chirpstack/src/uplink/join.rs +++ b/chirpstack/src/uplink/join.rs @@ -26,6 +26,7 @@ use crate::storage::{ device::{self, DeviceClass}, device_keys, device_profile, device_queue, error::Error as StorageError, + helpers::get_all_device_data, metrics, tenant, }; use crate::{config, devaddr::get_random_dev_addr, downlink, integration, region, stream}; @@ -120,11 +121,8 @@ impl JoinRequest { ctx.join_request.as_ref().unwrap().dev_eui.to_string(), ); - ctx.get_device_or_try_pr_roaming().await?; + ctx.get_device_data_or_try_pr_roaming().await?; ctx.get_device_keys_or_js_client().await?; // used to validate MIC + if we need external JS - ctx.get_application().await?; - ctx.get_tenant().await?; - ctx.get_device_profile().await?; ctx.set_device_info()?; ctx.filter_rx_info_by_tenant()?; ctx.filter_rx_info_by_region_config_id()?; @@ -178,11 +176,8 @@ impl JoinRequest { }; ctx.get_join_request_payload_relayed()?; - ctx.get_device().await?; + ctx.get_device_data().await?; ctx.get_device_keys_or_js_client().await?; - ctx.get_application().await?; - ctx.get_tenant().await?; - ctx.get_device_profile().await?; ctx.set_device_info()?; ctx.set_relay_rx_info()?; ctx.abort_on_device_is_disabled()?; @@ -237,11 +232,21 @@ impl JoinRequest { Ok(()) } - async fn get_device(&mut self) -> Result<()> { - trace!("Getting device"); + async fn get_device_data(&mut self) -> Result<()> { + trace!("Getting device data"); let jr = self.join_request.as_ref().unwrap(); - let dev = device::get(&jr.dev_eui).await?; + + let (dev, app, t, dp) = get_all_device_data(jr.dev_eui).await?; + + if dp.region != self.uplink_frame_set.region_common_name { + return Err(anyhow!("Invalid device-profile region")); + } + + self.tenant = Some(t); + self.application = Some(app); + self.device_profile = Some(dp); self.device = Some(dev); + Ok(()) } @@ -268,10 +273,10 @@ impl JoinRequest { Ok(()) } - async fn get_device_or_try_pr_roaming(&mut self) -> Result<()> { + async fn get_device_data_or_try_pr_roaming(&mut self) -> Result<()> { trace!("Getting device"); let jr = self.join_request.as_ref().unwrap(); - let dev = match device::get(&jr.dev_eui).await { + let (dev, app, t, dp) = match get_all_device_data(jr.dev_eui).await { Ok(v) => v, Err(e) => { if let StorageError::NotFound(_) = e { @@ -289,32 +294,15 @@ impl JoinRequest { } }; - self.device = Some(dev); - Ok(()) - } - - async fn get_application(&mut self) -> Result<()> { - trace!("Getting application"); - self.application = - Some(application::get(&self.device.as_ref().unwrap().application_id).await?); - Ok(()) - } - - async fn get_tenant(&mut self) -> Result<()> { - trace!("Getting tenant"); - self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?); - Ok(()) - } - - async fn get_device_profile(&mut self) -> Result<()> { - trace!("Getting device-profile"); - - let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?; if dp.region != self.uplink_frame_set.region_common_name { return Err(anyhow!("Invalid device-profile region")); } + self.tenant = Some(t); + self.application = Some(app); self.device_profile = Some(dp); + self.device = Some(dev); + Ok(()) } diff --git a/chirpstack/src/uplink/join_sns.rs b/chirpstack/src/uplink/join_sns.rs index e35a542c..a956dbf0 100644 --- a/chirpstack/src/uplink/join_sns.rs +++ b/chirpstack/src/uplink/join_sns.rs @@ -12,6 +12,7 @@ use crate::storage::{ device::{self, DeviceClass}, device_keys, device_profile, device_queue, device_session, error::Error as StorageError, + helpers::get_all_device_data, metrics, tenant, }; use crate::{config, devaddr::get_random_dev_addr, integration, region, stream}; @@ -81,11 +82,8 @@ impl JoinRequest { }; ctx.get_join_request_payload()?; - ctx.get_device().await?; + ctx.get_device_data().await?; ctx.get_device_keys_or_js_client().await?; - ctx.get_application().await?; - ctx.get_tenant().await?; - ctx.get_device_profile().await?; ctx.set_device_info()?; ctx.abort_on_device_is_disabled()?; ctx.abort_on_otaa_is_disabled()?; @@ -122,11 +120,20 @@ impl JoinRequest { Ok(()) } - async fn get_device(&mut self) -> Result<()> { - trace!("Getting device"); + async fn get_device_data(&mut self) -> Result<()> { + trace!("Getting device data"); let jr = self.join_request.as_ref().unwrap(); - let dev = device::get(&jr.dev_eui).await?; + let (dev, app, t, dp) = get_all_device_data(jr.dev_eui).await?; + + if dp.region != self.uplink_frame_set.region_common_name { + return Err(anyhow!("Invalid device-profile region")); + } + + self.tenant = Some(t); + self.application = Some(app); + self.device_profile = Some(dp); self.device = Some(dev); + Ok(()) } @@ -153,31 +160,6 @@ impl JoinRequest { Ok(()) } - async fn get_application(&mut self) -> Result<()> { - trace!("Getting application"); - self.application = - Some(application::get(&self.device.as_ref().unwrap().application_id).await?); - Ok(()) - } - - async fn get_tenant(&mut self) -> Result<()> { - trace!("Getting tenant"); - self.tenant = Some(tenant::get(&self.application.as_ref().unwrap().tenant_id).await?); - Ok(()) - } - - async fn get_device_profile(&mut self) -> Result<()> { - trace!("Getting device-profile"); - - let dp = device_profile::get(&self.device.as_ref().unwrap().device_profile_id).await?; - if dp.region != self.uplink_frame_set.region_common_name { - return Err(anyhow!("Invalid device-profile region")); - } - - self.device_profile = Some(dp); - Ok(()) - } - fn set_device_info(&mut self) -> Result<()> { let tenant = self.tenant.as_ref().unwrap(); let app = self.application.as_ref().unwrap();