From a7280d9c21aa80c4a14c786417f6b27487816b09 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 20 Feb 2025 15:42:50 +0000 Subject: [PATCH] WIP. --- chirpstack/src/applayer/fragmentation.rs | 66 ++++ chirpstack/src/applayer/fuota/flow.rs | 420 +++++++++++++++++++++ chirpstack/src/applayer/fuota/mod.rs | 9 + chirpstack/src/applayer/fuota/scheduler.rs | 45 +++ chirpstack/src/applayer/mod.rs | 13 +- chirpstack/src/applayer/multicastsetup.rs | 139 +++++++ chirpstack/src/cmd/root.rs | 3 +- chirpstack/src/storage/fields/mod.rs | 5 +- chirpstack/src/storage/fuota.rs | 15 +- 9 files changed, 707 insertions(+), 8 deletions(-) create mode 100644 chirpstack/src/applayer/fragmentation.rs create mode 100644 chirpstack/src/applayer/fuota/flow.rs create mode 100644 chirpstack/src/applayer/fuota/mod.rs create mode 100644 chirpstack/src/applayer/fuota/scheduler.rs create mode 100644 chirpstack/src/applayer/multicastsetup.rs diff --git a/chirpstack/src/applayer/fragmentation.rs b/chirpstack/src/applayer/fragmentation.rs new file mode 100644 index 00000000..aec9ef81 --- /dev/null +++ b/chirpstack/src/applayer/fragmentation.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use chrono::Utc; +use tracing::{info, warn}; + +use crate::storage::fields::device_profile::Ts004Version; +use crate::storage::{device, device_profile, fuota}; +use lrwn::applayer::fragmentation; + +pub async fn handle_uplink( + dev: &device::Device, + dp: &device_profile::DeviceProfile, + data: &[u8], +) -> Result<()> { + let version = dp + .app_layer_params + .ts004_version + .ok_or_else(|| anyhow!("Device does not support TS004"))?; + + match version { + Ts004Version::V100 => handle_uplink_v100(dev, data).await, + } +} + +async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> { + let pl = fragmentation::v1::Payload::from_slice(true, data)?; + + match pl { + fragmentation::v1::Payload::FragSessionSetupAns(pl) => { + handle_v1_frag_session_setup_ans(dev, pl).await? + } + _ => {} + } + + Ok(()) +} + +async fn handle_v1_frag_session_setup_ans( + dev: &device::Device, + pl: fragmentation::v1::FragSessionSetupAnsPayload, +) -> Result<()> { + info!("Handling FragSessionSetupAns"); + + let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?; + + if pl.encoding_unsupported + | pl.not_enough_memory + | pl.frag_session_index_not_supported + | pl.wrong_descriptor + { + warn!( + frag_index = pl.frag_index, + encoding_unsupported = pl.encoding_unsupported, + not_enough_memory = pl.not_enough_memory, + frag_session_index_not_supported = pl.frag_session_index_not_supported, + wrong_descriptor = pl.wrong_descriptor, + "FragSessionAns contains errors" + ); + fuota_dev.return_msg = format!("Error: FragSessionAns response encoding_unsupported={}, not_enough_memory={}, frag_session_index_not_supported={}, wrong_descriptor={}", pl.encoding_unsupported, pl.not_enough_memory, pl.frag_session_index_not_supported, pl.wrong_descriptor); + } else { + fuota_dev.frag_session_setup_completed_at = Some(Utc::now()); + } + + let _ = fuota::update_device(fuota_dev).await?; + + Ok(()) +} diff --git a/chirpstack/src/applayer/fuota/flow.rs b/chirpstack/src/applayer/fuota/flow.rs new file mode 100644 index 00000000..8ce7994d --- /dev/null +++ b/chirpstack/src/applayer/fuota/flow.rs @@ -0,0 +1,420 @@ +use anyhow::Result; +use chrono::{TimeDelta, Utc}; +use tracing::info; + +use lrwn::applayer::{fragmentation, multicastsetup}; +use lrwn::region::MacVersion; + +use crate::config; +use crate::downlink; +use crate::gpstime::ToGpsTime; +use crate::storage::fields::FuotaJob; +use crate::storage::{device_keys, device_profile, device_queue, fuota, multicast}; + +pub struct Flow { + job: fuota::FuotaDeploymentJob, + fuota_deployment: fuota::FuotaDeployment, + device_profile: device_profile::DeviceProfile, +} + +impl Flow { + pub async fn handle_job(job: fuota::FuotaDeploymentJob) -> Result<()> { + let fuota_deployment = fuota::get_deployment(job.fuota_deployment_id.into()).await?; + let device_profile = + device_profile::get(&fuota_deployment.device_profile_id.into()).await?; + + let mut flow = Flow { + job, + fuota_deployment, + device_profile, + }; + flow.dispatch().await + } + + async fn dispatch(&mut self) -> Result<()> { + let conf = config::get(); + self.job.attempt_count += 1; + + info!(attempt_count = self.job.attempt_count, "Starting job"); + + let resp = match self.job.job { + FuotaJob::CreateMcGroup => self.create_mc_group().await, + FuotaJob::AddDevsToMcGroup => self.add_devices_to_multicast_group().await, + FuotaJob::AddGwsToMcGroup => self.add_gateways_to_multicast_group().await, + FuotaJob::McGroupSetup => self.multicast_group_setup().await, + FuotaJob::FragSessionSetup => self.fragmentation_session_setup().await, + FuotaJob::McSession => self.multicast_session_setup().await, + FuotaJob::Enqueue => self.enqueue().await, + FuotaJob::FragStatus => self.fragmentation_status().await, + }; + + match resp { + Ok(Some(next_job)) => { + if self.job.job == next_job { + // Re-run the same job in the future. + let job = self.job.clone(); + let _ = fuota::update_job(job).await?; + } else { + // Create the next job (which automatically sets the current job to completed). + let _ = fuota::create_job(fuota::FuotaDeploymentJob { + fuota_deployment_id: self.job.fuota_deployment_id, + job: next_job, + max_retry_count: match next_job { + FuotaJob::FragSessionSetup | FuotaJob::McGroupSetup => { + self.fuota_deployment.unicast_max_retry_count + } + _ => 0, + }, + ..Default::default() + }) + .await?; + } + } + Ok(None) => { + // No further jobs to execute, set the current job to completed. + let mut job = self.job.clone(); + job.completed_at = Some(Utc::now()); + let _ = fuota::update_job(job).await?; + } + Err(e) => { + // Re-run the same job in the future. + let mut job = self.job.clone(); + job.scheduler_run_after = Utc::now() + conf.network.scheduler.interval; + job.return_msg = format!("Error: {}", e); + let _ = fuota::update_job(job).await?; + return Err(e); + } + } + + Ok(()) + } + + async fn create_mc_group(&mut self) -> Result> { + // If this job fails, then there is no need to execute the others. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(None); + } + + info!("Creating multicast-group for FUOTA deployment"); + + // Get McAppSKey + McNwkSKey. + let mc_app_s_key = multicastsetup::v1::get_mc_app_s_key( + self.fuota_deployment.multicast_key, + self.fuota_deployment.multicast_addr, + )?; + let mc_nwk_s_key = multicastsetup::v1::get_mc_net_s_key( + self.fuota_deployment.multicast_key, + self.fuota_deployment.multicast_addr, + )?; + + let _ = multicast::create(multicast::MulticastGroup { + id: self.fuota_deployment.id, + application_id: self.fuota_deployment.application_id, + name: format!("fuota-{}", self.fuota_deployment.id), + region: self.device_profile.region, + mc_addr: self.fuota_deployment.multicast_addr, + mc_nwk_s_key, + mc_app_s_key, + f_cnt: 0, + group_type: self.fuota_deployment.multicast_group_type.clone(), + frequency: self.fuota_deployment.multicast_frequency, + dr: self.fuota_deployment.multicast_dr, + class_b_ping_slot_nb_k: self.fuota_deployment.multicast_class_b_ping_slot_nb_k, + class_c_scheduling_type: self.fuota_deployment.multicast_class_c_scheduling_type, + ..Default::default() + }) + .await?; + + Ok(Some(FuotaJob::AddDevsToMcGroup)) + } + + async fn add_devices_to_multicast_group(&mut self) -> Result> { + // If this job fails, then there is no need to execute the others. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(None); + } + + info!("Adding devices to multicast-group"); + + let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + for fuota_d in fuota_devices { + multicast::add_device(&fuota_d.fuota_deployment_id, &fuota_d.dev_eui).await?; + } + + Ok(Some(FuotaJob::AddGwsToMcGroup)) + } + + async fn add_gateways_to_multicast_group(&mut self) -> Result> { + // If this job fails, then there is no need to execute the others. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(None); + } + + info!("Adding gateways to multicast-group (if any)"); + + let fuota_gws = fuota::get_gateways(self.job.fuota_deployment_id.into(), -1, 0).await?; + for fuota_gw in fuota_gws { + multicast::add_gateway(&fuota_gw.fuota_deployment_id, &fuota_gw.gateway_id).await?; + } + + Ok(Some(FuotaJob::McGroupSetup)) + } + + async fn multicast_group_setup(&mut self) -> Result> { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(Some(FuotaJob::FragSessionSetup)); + } + + let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + + // Filter on devices that have not completed the McGroupSetup. + let fuota_devices: Vec = fuota_devices + .into_iter() + .filter(|d| d.mc_group_setup_completed_at.is_none()) + .collect(); + + for fuota_dev in &fuota_devices { + let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?; + let mc_root_key = match self.device_profile.mac_version { + MacVersion::LORAWAN_1_0_0 + | MacVersion::LORAWAN_1_0_1 + | MacVersion::LORAWAN_1_0_2 + | MacVersion::LORAWAN_1_0_3 + | MacVersion::LORAWAN_1_0_4 => { + multicastsetup::v1::get_mc_root_key_for_gen_app_key(dev_keys.gen_app_key)? + } + MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => { + multicastsetup::v1::get_mc_root_key_for_app_key(dev_keys.app_key)? + } + }; + let mc_ke_key = multicastsetup::v1::get_mc_ke_key(mc_root_key)?; + let mc_key_encrypted = + multicastsetup::v1::encrypt_mc_key(mc_ke_key, self.fuota_deployment.multicast_key); + + let pl = multicastsetup::v1::Payload::McGroupSetupReq( + multicastsetup::v1::McGroupSetupReqPayload { + mc_group_id_header: multicastsetup::v1::McGroupSetupReqPayloadMcGroupIdHeader { + mc_group_id: 0, + }, + mc_addr: self.fuota_deployment.multicast_addr, + mc_key_encrypted, + min_mc_f_count: 0, + max_mc_f_count: u32::MAX, + }, + ); + + device_queue::enqueue_item(device_queue::DeviceQueueItem { + dev_eui: fuota_dev.dev_eui, + f_port: self.device_profile.app_layer_params.ts005_f_port.into(), + data: pl.to_vec()?, + ..Default::default() + }) + .await?; + } + + if !fuota_devices.is_empty() { + // There are devices pending setup, we need to re-run this job. + self.job.scheduler_run_after = + Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); + Ok(Some(FuotaJob::McGroupSetup)) + } else { + // All devices have completed the setup, move on to next job. + Ok(Some(FuotaJob::FragSessionSetup)) + } + } + + async fn fragmentation_session_setup(&mut self) -> Result> { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(Some(FuotaJob::McSession)); + } + + let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize; + let fragments = + (self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize; + let padding = + (fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size; + + let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + + // Filter on devices that have completed the previous step, but not yet the FragSessionSetup. + let fuota_devices: Vec = fuota_devices + .into_iter() + .filter(|d| { + d.mc_group_setup_completed_at.is_some() + && d.frag_session_setup_completed_at.is_none() + }) + .collect(); + + for fuota_dev in &fuota_devices { + let pl = fragmentation::v1::Payload::FragSessionSetupReq( + fragmentation::v1::FragSessionSetupReqPayload { + frag_session: fragmentation::v1::FragSessionSetuReqPayloadFragSession { + mc_group_bit_mask: [true, false, false, false], + frag_index: 0, + }, + nb_frag: fragments as u16, + frag_size: fragment_size as u8, + padding: padding as u8, + control: fragmentation::v1::FragSessionSetuReqPayloadControl { + block_ack_delay: 0, + fragmentation_matrix: 0, + }, + descriptor: [0, 0, 0, 0], + }, + ); + + device_queue::enqueue_item(device_queue::DeviceQueueItem { + dev_eui: fuota_dev.dev_eui, + f_port: self.device_profile.app_layer_params.ts004_f_port.into(), + data: pl.to_vec()?, + ..Default::default() + }) + .await?; + } + + if !fuota_devices.is_empty() { + // There are devices pending setup, we need to re-run this job. + self.job.scheduler_run_after = + Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); + Ok(Some(FuotaJob::FragSessionSetup)) + } else { + // All devices have completed the setup, move on to next job. + Ok(Some(FuotaJob::McSession)) + } + } + + async fn multicast_session_setup(&mut self) -> Result> { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(Some(FuotaJob::Enqueue)); + } + + let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + + // Filter on devices that have completed the previous step, but not yet the McSession. + let fuota_devices: Vec = fuota_devices + .into_iter() + .filter(|d| { + d.frag_session_setup_completed_at.is_some() && d.mc_session_completed_at.is_none() + }) + .collect(); + + for fuota_dev in &fuota_devices { + // We want to start the session (retry_count + 1) x the uplink_interval. + // Note that retry_count=0 means only one attempt. + let session_start = (Utc::now() + + TimeDelta::seconds( + (self.job.max_retry_count as i64 + 1) + * self.device_profile.uplink_interval as i64, + )) + .to_gps_time() + .num_seconds() + % (1 << 32); + + let pl = match self.fuota_deployment.multicast_group_type.as_ref() { + "B" => multicastsetup::v1::Payload::McClassBSessionReq( + multicastsetup::v1::McClassBSessionReqPayload { + mc_group_id_header: + multicastsetup::v1::McClassBSessionReqPayloadMcGroupIdHeader { + mc_group_id: 0, + }, + session_time: (session_start - (session_start % 128)) as u32, + time_out_periodicity: + multicastsetup::v1::McClassBSessionReqPayloadTimeOutPeriodicity { + time_out: self.fuota_deployment.multicast_timeout as u8, + periodicity: self.fuota_deployment.multicast_class_b_ping_slot_nb_k + as u8, + }, + dl_frequ: self.fuota_deployment.multicast_frequency as u32, + dr: self.fuota_deployment.multicast_dr as u8, + }, + ), + "C" => multicastsetup::v1::Payload::McClassCSessionReq( + multicastsetup::v1::McClassCSessionReqPayload { + mc_group_id_header: + multicastsetup::v1::McClassCSessionReqPayloadMcGroupIdHeader { + mc_group_id: 0, + }, + session_time: session_start as u32, + session_time_out: + multicastsetup::v1::McClassCSessionReqPayloadSessionTimeOut { + time_out: self.fuota_deployment.multicast_timeout as u8, + }, + dl_frequ: self.fuota_deployment.multicast_frequency as u32, + dr: self.fuota_deployment.multicast_dr as u8, + }, + ), + _ => { + return Err(anyhow!( + "Unsupported group-type: {}", + self.fuota_deployment.multicast_group_type + )) + } + }; + + device_queue::enqueue_item(device_queue::DeviceQueueItem { + dev_eui: fuota_dev.dev_eui, + f_port: self.device_profile.app_layer_params.ts005_f_port.into(), + data: pl.to_vec()?, + ..Default::default() + }) + .await?; + } + + // In this case we need to exactly try the max. attempts, because this is what the + // session-start time calculation is based on. If we continue with enqueueing too + // early, the multicast-session hasn't started yet. + self.job.scheduler_run_after = + Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); + Ok(Some(FuotaJob::McSession)) + } + + async fn enqueue(&mut self) -> Result> { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count - 1 > self.job.max_retry_count { + return Ok(Some(FuotaJob::FragStatus)); + } + + let payload_length = self.fuota_deployment.payload.len(); + let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize; + let padding = (fragment_size - (payload_length % fragment_size)) % fragment_size; + + let fragments = (payload_length as f32 / fragment_size as f32).ceil() as usize; + let redundancy = (fragments as f32 + * self.fuota_deployment.fragmentation_redundancy_percentage as f32 + / 100.0) + .ceil() as usize; + + let mut payload = self.fuota_deployment.payload.clone(); + payload.extend_from_slice(&vec![0; padding]); + + let encoded_fragments = fragmentation::v1::encode(&payload, fragment_size, redundancy)?; + + for (i, frag) in encoded_fragments.iter().enumerate() { + let pl = + fragmentation::v1::Payload::DataFragment(fragmentation::v1::DataFragmentPayload { + index_and_n: fragmentation::v1::DataFragmentPayloadIndexAndN { + frag_index: 0, + n: (i + 1) as u16, + }, + data: frag.clone(), + }); + + let _ = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem { + multicast_group_id: self.fuota_deployment.id, + f_port: self.device_profile.app_layer_params.ts004_f_port as i16, + data: pl.to_vec()?, + ..Default::default() + }) + .await?; + } + + Ok(Some(FuotaJob::FragStatus)) + } + + async fn fragmentation_status(&mut self) -> Result> { + Ok(None) + } +} diff --git a/chirpstack/src/applayer/fuota/mod.rs b/chirpstack/src/applayer/fuota/mod.rs new file mode 100644 index 00000000..842d2922 --- /dev/null +++ b/chirpstack/src/applayer/fuota/mod.rs @@ -0,0 +1,9 @@ +use tracing::info; + +pub mod flow; +pub mod scheduler; + +pub async fn setup() { + info!("Setting up FUOTA scheduler loop"); + tokio::spawn(scheduler::scheduler_loop()); +} diff --git a/chirpstack/src/applayer/fuota/scheduler.rs b/chirpstack/src/applayer/fuota/scheduler.rs new file mode 100644 index 00000000..f684792e --- /dev/null +++ b/chirpstack/src/applayer/fuota/scheduler.rs @@ -0,0 +1,45 @@ +use anyhow::Result; +use tokio::time::sleep; +use tracing::{error, span, trace, Instrument, Level}; + +use crate::applayer::fuota::flow; +use crate::config; +use crate::storage::fuota; + +pub async fn scheduler_loop() { + let conf = config::get(); + + loop { + trace!("Starting fuota scheduler_loop run"); + if let Err(err) = schedule_batch(conf.network.scheduler.batch_size).await { + error!(error = %err, "Scheduling FUOTA batch error"); + } else { + trace!("schedule_batch completed without error"); + } + sleep(conf.network.scheduler.interval).await; + } +} + +async fn schedule_batch(size: usize) -> Result<()> { + trace!("Get schedulable fuota jobs"); + let jobs = fuota::get_schedulable_jobs(size).await?; + trace!(job_count = jobs.len(), "Got this number of fuota jobs"); + + let mut handles = vec![]; + + for job in jobs { + // Spawn the batch as async tasks. + let handle = tokio::spawn(async move { + let span = span!(Level::INFO, "job", fuota_deployment_id = %job.fuota_deployment_id, job = %job.job); + + if let Err(e) = flow::Flow::handle_job(job).instrument(span).await { + error!(error = %e, "Handle FUOTA job error"); + } + }); + handles.push(handle); + } + + futures::future::join_all(handles).await; + + Ok(()) +} diff --git a/chirpstack/src/applayer/mod.rs b/chirpstack/src/applayer/mod.rs index ef27a556..f15e3fc7 100644 --- a/chirpstack/src/applayer/mod.rs +++ b/chirpstack/src/applayer/mod.rs @@ -5,6 +5,9 @@ use crate::storage::{device, device_profile}; use chirpstack_api::gw; pub mod clocksync; +pub mod fragmentation; +pub mod fuota; +pub mod multicastsetup; pub async fn handle_uplink( dev: &device::Device, @@ -31,9 +34,15 @@ async fn _handle_uplink( .instrument(span) .await } else if dp.app_layer_params.ts004_f_port == f_port { - unimplemented!() + let span = span!(Level::INFO, "ts004"); + fragmentation::handle_uplink(dev, dp, data) + .instrument(span) + .await } else if dp.app_layer_params.ts005_f_port == f_port { - unimplemented!() + let span = span!(Level::INFO, "ts005"); + multicastsetup::handle_uplink(dev, dp, data) + .instrument(span) + .await } else { return Err(anyhow!("Unexpected f_port {}", f_port)); } diff --git a/chirpstack/src/applayer/multicastsetup.rs b/chirpstack/src/applayer/multicastsetup.rs new file mode 100644 index 00000000..720683dc --- /dev/null +++ b/chirpstack/src/applayer/multicastsetup.rs @@ -0,0 +1,139 @@ +use anyhow::Result; +use chrono::Utc; +use tracing::{info, warn}; + +use crate::storage::fields::device_profile::Ts005Version; +use crate::storage::{device, device_profile, fuota}; +use lrwn::applayer::multicastsetup; + +pub async fn handle_uplink( + dev: &device::Device, + dp: &device_profile::DeviceProfile, + data: &[u8], +) -> Result<()> { + let version = dp + .app_layer_params + .ts005_version + .ok_or_else(|| anyhow!("Device does not support TS005"))?; + + match version { + Ts005Version::V100 => handle_uplink_v100(dev, data).await, + } +} + +async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> { + let pl = multicastsetup::v1::Payload::from_slice(true, data)?; + + match pl { + multicastsetup::v1::Payload::McGroupSetupAns(pl) => { + handle_v1_mc_group_setup_ans(dev, pl).await? + } + multicastsetup::v1::Payload::McClassBSessionAns(pl) => { + handle_v1_mc_class_b_session_ans(dev, pl).await? + } + multicastsetup::v1::Payload::McClassCSessionAns(pl) => { + handle_v1_mc_class_c_session_ans(dev, pl).await? + } + _ => {} + } + + Ok(()) +} + +async fn handle_v1_mc_group_setup_ans( + dev: &device::Device, + pl: multicastsetup::v1::McGroupSetupAnsPayload, +) -> Result<()> { + info!("Handling McGroupSetupAns"); + + let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?; + + if pl.mc_group_id_header.id_error { + warn!( + mc_group_id = pl.mc_group_id_header.mc_group_id, + id_error = true, + "McGroupSetupAns contains errors" + ); + fuota_dev.return_msg = "Error: McGroupSetupAns response id_error=true".into(); + } else { + fuota_dev.mc_group_setup_completed_at = Some(Utc::now()); + } + + let _ = fuota::update_device(fuota_dev).await?; + + Ok(()) +} + +async fn handle_v1_mc_class_b_session_ans( + dev: &device::Device, + pl: multicastsetup::v1::McClassBSessionAnsPayload, +) -> Result<()> { + info!("Handling McClassBSessionAns"); + + let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?; + + if pl.status_and_mc_group_id.dr_error + | pl.status_and_mc_group_id.freq_error + | pl.status_and_mc_group_id.mc_group_undefined + { + warn!( + dr_error = pl.status_and_mc_group_id.dr_error, + freq_error = pl.status_and_mc_group_id.freq_error, + mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined, + "McClassBSessionAns contains errors" + ); + + fuota_dev.return_msg = format!("Error: McClassBSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}", + pl.status_and_mc_group_id.dr_error, + pl.status_and_mc_group_id.freq_error, + pl.status_and_mc_group_id.mc_group_undefined, + ); + } else { + info!( + time_to_start = pl.time_to_start.unwrap_or_default(), + "McClassBSessionAns OK" + ); + fuota_dev.mc_session_completed_at = Some(Utc::now()); + } + + let _ = fuota::update_device(fuota_dev).await?; + + Ok(()) +} + +async fn handle_v1_mc_class_c_session_ans( + dev: &device::Device, + pl: multicastsetup::v1::McClassCSessionAnsPayload, +) -> Result<()> { + info!("Handling McClassCSessionAns"); + + let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?; + + if pl.status_and_mc_group_id.dr_error + | pl.status_and_mc_group_id.freq_error + | pl.status_and_mc_group_id.mc_group_undefined + { + warn!( + dr_error = pl.status_and_mc_group_id.dr_error, + freq_error = pl.status_and_mc_group_id.freq_error, + mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined, + "McClassCSessionAns contains errors" + ); + + fuota_dev.return_msg = format!("Error: McClassCSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}", + pl.status_and_mc_group_id.dr_error, + pl.status_and_mc_group_id.freq_error, + pl.status_and_mc_group_id.mc_group_undefined, + ); + } else { + info!( + time_to_start = pl.time_to_start.unwrap_or_default(), + "McClassCSessionAns OK" + ); + fuota_dev.mc_session_completed_at = Some(Utc::now()); + } + + let _ = fuota::update_device(fuota_dev).await?; + + Ok(()) +} diff --git a/chirpstack/src/cmd/root.rs b/chirpstack/src/cmd/root.rs index c4dc3d0c..e44bca17 100644 --- a/chirpstack/src/cmd/root.rs +++ b/chirpstack/src/cmd/root.rs @@ -5,7 +5,7 @@ use signal_hook_tokio::Signals; use tracing::{info, warn}; use crate::gateway; -use crate::{adr, api, backend, downlink, integration, region, storage}; +use crate::{adr, api, applayer::fuota, backend, downlink, integration, region, storage}; pub async fn run() -> Result<()> { info!( @@ -21,6 +21,7 @@ pub async fn run() -> Result<()> { integration::setup().await?; gateway::backend::setup().await?; downlink::setup().await; + fuota::setup().await; api::setup().await?; let mut signals = Signals::new([SIGINT, SIGTERM]).unwrap(); diff --git a/chirpstack/src/storage/fields/mod.rs b/chirpstack/src/storage/fields/mod.rs index 531915fa..77ff8a85 100644 --- a/chirpstack/src/storage/fields/mod.rs +++ b/chirpstack/src/storage/fields/mod.rs @@ -10,10 +10,7 @@ mod uuid; pub use big_decimal::BigDecimal; pub use dev_nonces::DevNonces; -pub use device_profile::{ - AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams, Ts003Version, Ts004Version, - Ts005Version, -}; +pub use device_profile::{AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams}; pub use device_session::DeviceSession; pub use fuota::{FuotaJob, RequestFragmentationSessionStatus}; pub use key_value::KeyValue; diff --git a/chirpstack/src/storage/fuota.rs b/chirpstack/src/storage/fuota.rs index 18966631..f299f4bd 100644 --- a/chirpstack/src/storage/fuota.rs +++ b/chirpstack/src/storage/fuota.rs @@ -362,6 +362,15 @@ pub async fn get_device( .map_err(|e| Error::from_diesel(e, dev_eui.to_string())) } +pub async fn get_latest_device_by_dev_eui(dev_eui: EUI64) -> Result { + fuota_deployment_device::dsl::fuota_deployment_device + .filter(fuota_deployment_device::dsl::dev_eui.eq(&dev_eui)) + .order_by(fuota_deployment_device::created_at.desc()) + .first(&mut get_async_db_conn().await?) + .await + .map_err(|e| Error::from_diesel(e, dev_eui.to_string())) +} + pub async fn update_device(d: FuotaDeploymentDevice) -> Result { let d: FuotaDeploymentDevice = diesel::update( fuota_deployment_device::dsl::fuota_deployment_device @@ -650,7 +659,11 @@ pub async fn get_max_fragment_size(d: &FuotaDeployment) -> Result { .n - 3; - Ok(max_pl_size) + Ok(if max_pl_size > d.payload.len() { + d.payload.len() + } else { + max_pl_size + }) } pub fn get_multicast_timeout(d: &FuotaDeployment) -> Result {