Error if there are no fuota devices + cleanup mc group.

In case there are no fuota devices (e.g. all devices failed the previous
step), this will log a warning and the flow will continue with multicast
cleanup and completion steps.
This commit is contained in:
Orne Brocaar 2025-03-05 16:33:51 +00:00
parent 71cc1aca74
commit bbdf2dd781
4 changed files with 56 additions and 4 deletions

View File

@ -47,6 +47,7 @@ impl Flow {
FuotaJob::McSession => self.multicast_session_setup().await,
FuotaJob::Enqueue => self.enqueue().await,
FuotaJob::FragStatus => self.fragmentation_status().await,
FuotaJob::DeleteMcGroup => self.delete_mc_group().await,
FuotaJob::Complete => self.complete().await,
};
@ -192,6 +193,11 @@ impl Flow {
.filter(|d| d.mc_group_setup_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(None);
}
for fuota_dev in &fuota_devices {
let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?;
let mc_root_key = match self.device_profile.mac_version {
@ -272,6 +278,11 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
let pl = fragmentation::v1::Payload::FragSessionSetupReq(
fragmentation::v1::FragSessionSetupReqPayload {
@ -333,6 +344,11 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
// We want to start the session (retry_count + 1) x the uplink_interval.
// Note that retry_count=0 means only one attempt.
@ -416,6 +432,19 @@ impl Flow {
info!("Enqueueing fragmented payload to multicast group");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
// Filter on devices that have completed the previous step, but not yet the McSession.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.mc_session_completed_at.is_some())
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
let payload_length = self.fuota_deployment.payload.len();
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
let padding = (fragment_size - (payload_length % fragment_size)) % fragment_size;
@ -452,7 +481,7 @@ impl Flow {
match self.fuota_deployment.request_fragmentation_session_status {
RequestFragmentationSessionStatus::NoRequest => {
Ok(Some((FuotaJob::Complete, Utc::now())))
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
}
RequestFragmentationSessionStatus::AfterFragEnqueue => {
Ok(Some((FuotaJob::FragStatus, Utc::now())))
@ -473,7 +502,7 @@ impl Flow {
async fn fragmentation_status(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::Complete, Utc::now())));
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
info!("Enqueue FragSessionStatusReq");
@ -488,6 +517,11 @@ impl Flow {
.filter(|d| d.mc_session_completed_at.is_some() && d.frag_status_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
let pl = fragmentation::v1::Payload::FragSessionStatusReq(
fragmentation::v1::FragSessionStatusReqPayload {
@ -511,10 +545,24 @@ impl Flow {
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::FragStatus, scheduler_run_after)))
} else {
Ok(Some((FuotaJob::Complete, Utc::now())))
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
}
}
async fn delete_mc_group(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::Complete, Utc::now())));
}
info!("Delete multicast group");
self.job.attempt_count += 1;
multicast::delete(&self.fuota_deployment.id).await?;
Ok(Some((FuotaJob::Complete, Utc::now())))
}
async fn complete(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {

View File

@ -91,6 +91,7 @@ pub enum FuotaJob {
FragSessionSetup,
Enqueue,
FragStatus,
DeleteMcGroup,
Complete,
}
@ -111,6 +112,7 @@ impl From<&FuotaJob> for String {
FuotaJob::FragSessionSetup => "FRAG_SESSION_SETUP",
FuotaJob::Enqueue => "ENQUEUE",
FuotaJob::FragStatus => "FRAG_STATUS",
FuotaJob::DeleteMcGroup => "DELETE_MC_GROUP",
FuotaJob::Complete => "COMPLETE",
}
.to_string()
@ -130,6 +132,7 @@ impl TryFrom<&str> for FuotaJob {
"FRAG_SESSION_SETUP" => Self::FragSessionSetup,
"ENQUEUE" => Self::Enqueue,
"FRAG_STATUS" => Self::FragStatus,
"DELETE_MC_GROUP" => Self::DeleteMcGroup,
"COMPLETE" => Self::Complete,
_ => return Err(anyhow!("Invalid FuotaJob value: {}", value)),
})

View File

@ -653,7 +653,7 @@ pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result<Vec<FuotaDeploymentJ
fuota_deployment_job::dsl::fuota_deployment_id
.eq(fields::Uuid::from(fuota_deployment_id)),
)
.order_by(fuota_deployment_job::dsl::scheduler_run_after)
.order_by(fuota_deployment_job::dsl::created_at)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string()))

View File

@ -45,6 +45,7 @@ function FuotaDeploymentDashboard(props: IProps) {
MC_SESSION: "Multicast session setup",
ENQUEUE: "Enqueue fragments",
FRAG_STATUS: "Request fragmentation status",
DELETE_MC_GROUP: "Delete multicast group",
COMPLETE: "Complete deployment",
};