From bbdf2dd78193481bb977031dd042f7ca93466ff6 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 5 Mar 2025 16:33:51 +0000 Subject: [PATCH] Error if there are no fuota devices + cleanup mc group. In case there are no fuota devices (e.g. all devices failed the previous step), this will log a warning and the flow will continue with multicast cleanup and completion steps. --- chirpstack/src/applayer/fuota/flow.rs | 54 +++++++++++++++++-- chirpstack/src/storage/fields/fuota.rs | 3 ++ chirpstack/src/storage/fuota.rs | 2 +- .../views/fuota/FuotaDeploymentDashboard.tsx | 1 + 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/chirpstack/src/applayer/fuota/flow.rs b/chirpstack/src/applayer/fuota/flow.rs index 92e840bf..f2a70fdd 100644 --- a/chirpstack/src/applayer/fuota/flow.rs +++ b/chirpstack/src/applayer/fuota/flow.rs @@ -47,6 +47,7 @@ impl Flow { FuotaJob::McSession => self.multicast_session_setup().await, FuotaJob::Enqueue => self.enqueue().await, FuotaJob::FragStatus => self.fragmentation_status().await, + FuotaJob::DeleteMcGroup => self.delete_mc_group().await, FuotaJob::Complete => self.complete().await, }; @@ -192,6 +193,11 @@ impl Flow { .filter(|d| d.mc_group_setup_completed_at.is_none()) .collect(); + if fuota_devices.is_empty() { + self.job.error_msg = "There are no devices available to complete this step".into(); + return Ok(None); + } + for fuota_dev in &fuota_devices { let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?; let mc_root_key = match self.device_profile.mac_version { @@ -272,6 +278,11 @@ impl Flow { }) .collect(); + if fuota_devices.is_empty() { + self.job.error_msg = "There are no devices available to complete this step".into(); + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); + } + for fuota_dev in &fuota_devices { let pl = fragmentation::v1::Payload::FragSessionSetupReq( fragmentation::v1::FragSessionSetupReqPayload { @@ -333,6 +344,11 @@ impl Flow { }) .collect(); + if fuota_devices.is_empty() { + self.job.error_msg = "There are no devices available to complete this step".into(); + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); + } + for fuota_dev in &fuota_devices { // We want to start the session (retry_count + 1) x the uplink_interval. // Note that retry_count=0 means only one attempt. @@ -416,6 +432,19 @@ impl Flow { info!("Enqueueing fragmented payload to multicast group"); self.job.attempt_count += 1; + let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + + // Filter on devices that have completed the previous step, but not yet the McSession. + let fuota_devices: Vec = fuota_devices + .into_iter() + .filter(|d| d.mc_session_completed_at.is_some()) + .collect(); + + if fuota_devices.is_empty() { + self.job.error_msg = "There are no devices available to complete this step".into(); + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); + } + let payload_length = self.fuota_deployment.payload.len(); let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize; let padding = (fragment_size - (payload_length % fragment_size)) % fragment_size; @@ -452,7 +481,7 @@ impl Flow { match self.fuota_deployment.request_fragmentation_session_status { RequestFragmentationSessionStatus::NoRequest => { - Ok(Some((FuotaJob::Complete, Utc::now()))) + Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))) } RequestFragmentationSessionStatus::AfterFragEnqueue => { Ok(Some((FuotaJob::FragStatus, Utc::now()))) @@ -473,7 +502,7 @@ impl Flow { async fn fragmentation_status(&mut self) -> Result)>> { // Proceed with next step after reaching the max attempts. if self.job.attempt_count > self.job.max_retry_count { - return Ok(Some((FuotaJob::Complete, Utc::now()))); + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); } info!("Enqueue FragSessionStatusReq"); @@ -488,6 +517,11 @@ impl Flow { .filter(|d| d.mc_session_completed_at.is_some() && d.frag_status_completed_at.is_none()) .collect(); + if fuota_devices.is_empty() { + self.job.error_msg = "There are no devices available to complete this step".into(); + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); + } + for fuota_dev in &fuota_devices { let pl = fragmentation::v1::Payload::FragSessionStatusReq( fragmentation::v1::FragSessionStatusReqPayload { @@ -511,10 +545,24 @@ impl Flow { Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); Ok(Some((FuotaJob::FragStatus, scheduler_run_after))) } else { - Ok(Some((FuotaJob::Complete, Utc::now()))) + Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))) } } + async fn delete_mc_group(&mut self) -> Result)>> { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count > self.job.max_retry_count { + return Ok(Some((FuotaJob::Complete, Utc::now()))); + } + + info!("Delete multicast group"); + self.job.attempt_count += 1; + + multicast::delete(&self.fuota_deployment.id).await?; + + Ok(Some((FuotaJob::Complete, Utc::now()))) + } + async fn complete(&mut self) -> Result)>> { // Proceed with next step after reaching the max attempts. if self.job.attempt_count > self.job.max_retry_count { diff --git a/chirpstack/src/storage/fields/fuota.rs b/chirpstack/src/storage/fields/fuota.rs index d1cbe83c..ec3bf937 100644 --- a/chirpstack/src/storage/fields/fuota.rs +++ b/chirpstack/src/storage/fields/fuota.rs @@ -91,6 +91,7 @@ pub enum FuotaJob { FragSessionSetup, Enqueue, FragStatus, + DeleteMcGroup, Complete, } @@ -111,6 +112,7 @@ impl From<&FuotaJob> for String { FuotaJob::FragSessionSetup => "FRAG_SESSION_SETUP", FuotaJob::Enqueue => "ENQUEUE", FuotaJob::FragStatus => "FRAG_STATUS", + FuotaJob::DeleteMcGroup => "DELETE_MC_GROUP", FuotaJob::Complete => "COMPLETE", } .to_string() @@ -130,6 +132,7 @@ impl TryFrom<&str> for FuotaJob { "FRAG_SESSION_SETUP" => Self::FragSessionSetup, "ENQUEUE" => Self::Enqueue, "FRAG_STATUS" => Self::FragStatus, + "DELETE_MC_GROUP" => Self::DeleteMcGroup, "COMPLETE" => Self::Complete, _ => return Err(anyhow!("Invalid FuotaJob value: {}", value)), }) diff --git a/chirpstack/src/storage/fuota.rs b/chirpstack/src/storage/fuota.rs index 042c33fb..8890054c 100644 --- a/chirpstack/src/storage/fuota.rs +++ b/chirpstack/src/storage/fuota.rs @@ -653,7 +653,7 @@ pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result