From 5bbd71ab3a43515a909a9cfb3b2c0b6084946241 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 19 Mar 2025 14:47:47 +0000 Subject: [PATCH] Add warnings to fuota deployment job + UI. In case some devices do not complete a job, this makes it possible to show a warning in the UI showing the amount of devices that did not complete the job. --- api/proto/api/fuota.proto | 5 +- api/rust/proto/chirpstack/api/fuota.proto | 5 +- .../up.sql | 3 + .../up.sql | 3 + chirpstack/src/api/fuota.rs | 1 + chirpstack/src/applayer/fuota/flow.rs | 289 ++++++++++++------ chirpstack/src/storage/fuota.rs | 9 + chirpstack/src/storage/schema_postgres.rs | 3 + chirpstack/src/storage/schema_sqlite.rs | 3 + .../views/fuota/FuotaDeploymentDashboard.tsx | 6 + 10 files changed, 232 insertions(+), 95 deletions(-) diff --git a/api/proto/api/fuota.proto b/api/proto/api/fuota.proto index 421d308c..4dd28fd9 100644 --- a/api/proto/api/fuota.proto +++ b/api/proto/api/fuota.proto @@ -393,6 +393,9 @@ message FuotaDeploymentJob { // Scheduler run after. google.protobuf.Timestamp scheduler_run_after = 6; + // Warning message. + string warning_msg = 7; + // Error message. - string error_msg = 7; + string error_msg = 8; } diff --git a/api/rust/proto/chirpstack/api/fuota.proto b/api/rust/proto/chirpstack/api/fuota.proto index 421d308c..4dd28fd9 100644 --- a/api/rust/proto/chirpstack/api/fuota.proto +++ b/api/rust/proto/chirpstack/api/fuota.proto @@ -393,6 +393,9 @@ message FuotaDeploymentJob { // Scheduler run after. google.protobuf.Timestamp scheduler_run_after = 6; + // Warning message. + string warning_msg = 7; + // Error message. - string error_msg = 7; + string error_msg = 8; } diff --git a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql index 22f03e72..87390a97 100644 --- a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql +++ b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql @@ -15,6 +15,8 @@ create table fuota_deployment ( multicast_class_b_ping_slot_nb_k smallint not null, multicast_frequency bigint not null, multicast_timeout smallint not null, + multicast_session_start timestamp with time zone null, + multicast_session_end timestamp with time zone null, unicast_max_retry_count smallint not null, fragmentation_fragment_size smallint not null, fragmentation_redundancy_percentage smallint not null, @@ -57,6 +59,7 @@ create table fuota_deployment_job ( max_retry_count smallint not null, attempt_count smallint not null, scheduler_run_after timestamp with time zone not null, + warning_msg text not null, error_msg text not null, primary key (fuota_deployment_id, job) diff --git a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql index 0f7f006b..d4642388 100644 --- a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql +++ b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql @@ -15,6 +15,8 @@ create table fuota_deployment ( multicast_class_b_ping_slot_nb_k smallint not null, multicast_frequency bigint not null, multicast_timeout smallint not null, + multicast_session_start datetime null, + multicast_session_end datetime null, unicast_max_retry_count smallint not null, fragmentation_fragment_size smallint not null, fragmentation_redundancy_percentage smallint not null, @@ -57,6 +59,7 @@ create table fuota_deployment_job ( max_retry_count smallint not null, attempt_count smallint not null, scheduler_run_after datetime not null, + warning_msg text not null, error_msg text not null, primary key (fuota_deployment_id, job) diff --git a/chirpstack/src/api/fuota.rs b/chirpstack/src/api/fuota.rs index 922f9a89..c7355ec9 100644 --- a/chirpstack/src/api/fuota.rs +++ b/chirpstack/src/api/fuota.rs @@ -630,6 +630,7 @@ impl FuotaService for Fuota { scheduler_run_after: Some(helpers::datetime_to_prost_timestamp( &j.scheduler_run_after, )), + warning_msg: j.warning_msg.clone(), error_msg: j.error_msg.clone(), }) .collect(), diff --git a/chirpstack/src/applayer/fuota/flow.rs b/chirpstack/src/applayer/fuota/flow.rs index 35984175..0e368ebd 100644 --- a/chirpstack/src/applayer/fuota/flow.rs +++ b/chirpstack/src/applayer/fuota/flow.rs @@ -197,27 +197,38 @@ impl Flow { } async fn multicast_group_setup(&mut self) -> Result)>> { - // Proceed with next step after reaching the max attempts. - if self.job.attempt_count > self.job.max_retry_count { - return Ok(Some((FuotaJob::FragSessionSetup, Utc::now()))); - } - - info!("Sending McGroupSetupReq commands to devices"); - self.job.attempt_count += 1; - let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; - // Filter on devices that have not completed the McGroupSetup. let fuota_devices: Vec = fuota_devices .into_iter() .filter(|d| d.mc_group_setup_completed_at.is_none()) .collect(); - if fuota_devices.is_empty() { - self.job.error_msg = "There are no devices available to complete this step".into(); - return Ok(None); + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count > self.job.max_retry_count { + info!("Set timeout error to devices that did not respond to McGroupSetupReq"); + fuota::set_device_timeout_error( + self.fuota_deployment.id.into(), + true, + false, + false, + false, + ) + .await?; + + if !fuota_devices.is_empty() { + self.job.warning_msg = format!( + "{} devices did not complete the multicast group setup", + fuota_devices.len() + ); + } + + return Ok(Some((FuotaJob::FragSessionSetup, Utc::now()))); } + info!("Sending McGroupSetupReq commands to devices"); + self.job.attempt_count += 1; + for fuota_dev in &fuota_devices { let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?; @@ -316,25 +327,11 @@ impl Flow { } async fn fragmentation_session_setup(&mut self) -> Result)>> { - // Proceed with next step after reaching the max attempts. - if self.job.attempt_count > self.job.max_retry_count { - return Ok(Some((FuotaJob::McSession, Utc::now()))); - } - - info!("Set timeout error to devices that did not respond to McGroupSetupReq"); - fuota::set_device_timeout_error(self.fuota_deployment.id.into(), true, false, false, false) - .await?; - - info!("Sending FragSessionSetupReq commands to devices"); - self.job.attempt_count += 1; - - let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize; - let fragments = - (self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize; - let padding = - (fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size; - let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + let fuota_devices_completed_mc_group_setup_count = fuota_devices + .iter() + .filter(|d| d.mc_group_setup_completed_at.is_some()) + .count(); // Filter on devices that have completed the previous step, but not yet the FragSessionSetup. let fuota_devices: Vec = fuota_devices @@ -345,11 +342,41 @@ impl Flow { }) .collect(); - if fuota_devices.is_empty() { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count > self.job.max_retry_count { + info!("Set timeout error to devices that did not respond to FragSessionSetupReq"); + fuota::set_device_timeout_error( + self.fuota_deployment.id.into(), + false, + false, + true, + false, + ) + .await?; + + if !fuota_devices.is_empty() { + self.job.warning_msg = format!( + "{} devices did not complete the fragmentation session setup", + fuota_devices.len() + ); + } + return Ok(Some((FuotaJob::McSession, Utc::now()))); + } + + info!("Sending FragSessionSetupReq commands to devices"); + self.job.attempt_count += 1; + + if fuota_devices_completed_mc_group_setup_count == 0 { self.job.error_msg = "There are no devices available to complete this step".into(); return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); } + let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize; + let fragments = + (self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize; + let padding = + (fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size; + for fuota_dev in &fuota_devices { let pl = match self.device_profile.app_layer_params.ts004_version { Some(Ts004Version::V100) => fragmentation::v1::Payload::FragSessionSetupReq( @@ -436,19 +463,11 @@ impl Flow { } async fn multicast_session_setup(&mut self) -> Result)>> { - // Proceed with next step after reaching the max attempts. - if self.job.attempt_count > self.job.max_retry_count { - return Ok(Some((FuotaJob::Enqueue, Utc::now()))); - } - - info!("Set timeout error to devices that did not respond to FragSessionSetupReq"); - fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, false, true, false) - .await?; - - info!("Sending McClassB/McClassCSessionReq commands to devices"); - self.job.attempt_count += 1; - let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + let fuota_devices_completed_frag_session_setup_count = fuota_devices + .iter() + .filter(|d| d.frag_session_setup_completed_at.is_some()) + .count(); // Filter on devices that have completed the previous step, but not yet the McSession. let fuota_devices: Vec = fuota_devices @@ -458,23 +477,79 @@ impl Flow { }) .collect(); - if fuota_devices.is_empty() { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count > self.job.max_retry_count { + info!("Set timeout error to devices that did not respond to McSessionReq"); + fuota::set_device_timeout_error( + self.fuota_deployment.id.into(), + false, + true, + false, + false, + ) + .await?; + + if !fuota_devices.is_empty() { + self.job.warning_msg = format!( + "{} devices did not complete the multicast session setup", + fuota_devices.len() + ); + } + + return Ok(Some(( + FuotaJob::Enqueue, + self.fuota_deployment + .multicast_session_start + .unwrap_or_else(|| Utc::now()), + ))); + } + + info!("Sending McClassB/McClassCSessionReq commands to devices"); + self.job.attempt_count += 1; + + if fuota_devices_completed_frag_session_setup_count == 0 { self.job.error_msg = "There are no devices available to complete this step".into(); return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); } - for fuota_dev in &fuota_devices { + // Calculate the session start and end dates the first time this job is executed. + if self.fuota_deployment.multicast_session_start.is_none() + && self.fuota_deployment.multicast_session_end.is_none() + { // We want to start the session (retry_count + 1) x the uplink_interval. // Note that retry_count=0 means only one attempt. - let session_start = (Utc::now() + let session_start = Utc::now() + TimeDelta::seconds( (self.job.max_retry_count as i64 + 1) * self.device_profile.uplink_interval as i64, - )) + ); + + let session_end = { + let timeout = match self.fuota_deployment.multicast_group_type.as_ref() { + "B" => Duration::from_secs( + 128 * (1 << self.fuota_deployment.multicast_timeout as u64), + ), + "C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64), + _ => return Err(anyhow!("Invalid multicast-group type")), + }; + + session_start + timeout + }; + + self.fuota_deployment.multicast_session_start = Some(session_start); + self.fuota_deployment.multicast_session_end = Some(session_end); + self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?; + } + + let session_start = self + .fuota_deployment + .multicast_session_start + .ok_or_else(|| anyhow!("multicast_session_start is None"))? .to_gps_time() .num_seconds() - % (1 << 32); + % (1 << 32); + for fuota_dev in &fuota_devices { let pl = match self.device_profile.app_layer_params.ts005_version { Some(Ts005Version::V100) => { match self.fuota_deployment.multicast_group_type.as_ref() { @@ -572,12 +647,19 @@ impl Flow { .await?; } - // In this case we need to exactly try the max. attempts, because this is what the - // session-start time calculation is based on. If we continue with enqueueing too - // early, the multicast-session hasn't started yet. - let scheduler_run_after = - Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); - Ok(Some((FuotaJob::McSession, scheduler_run_after))) + if !fuota_devices.is_empty() { + // There are devices pending setup, we need to re-run this job. + let scheduler_run_after = + Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); + Ok(Some((FuotaJob::McSession, scheduler_run_after))) + } else { + Ok(Some(( + FuotaJob::Enqueue, + self.fuota_deployment + .multicast_session_start + .unwrap_or_else(|| Utc::now()), + ))) + } } async fn enqueue(&mut self) -> Result)>> { @@ -586,16 +668,12 @@ impl Flow { return Ok(Some((FuotaJob::FragStatus, Utc::now()))); } - info!("Set timeout error to devices that did not respond to McSessionReq"); - fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, true, false, false) - .await?; - info!("Enqueueing fragmented payload to multicast group"); self.job.attempt_count += 1; let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; - // Filter on devices that have completed the previous step, but not yet the McSession. + // Filter on devices that have completed the previous step. let fuota_devices: Vec = fuota_devices .into_iter() .filter(|d| d.mc_session_completed_at.is_some()) @@ -674,35 +752,30 @@ impl Flow { } match self.fuota_deployment.request_fragmentation_session_status { - RequestFragmentationSessionStatus::NoRequest => { - Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))) - } + RequestFragmentationSessionStatus::NoRequest => Ok(Some(( + FuotaJob::DeleteMcGroup, + self.fuota_deployment + .multicast_session_end + .unwrap_or_else(|| Utc::now()), + ))), RequestFragmentationSessionStatus::AfterFragEnqueue => { Ok(Some((FuotaJob::FragStatus, Utc::now()))) } - RequestFragmentationSessionStatus::AfterSessTimeout => { - let timeout = match self.fuota_deployment.multicast_group_type.as_ref() { - "B" => Duration::from_secs( - 128 * (1 << self.fuota_deployment.multicast_timeout as u64), - ), - "C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64), - _ => return Err(anyhow!("Invalid multicast-group type")), - }; - Ok(Some((FuotaJob::FragStatus, Utc::now() + timeout))) - } + RequestFragmentationSessionStatus::AfterSessTimeout => Ok(Some(( + FuotaJob::FragStatus, + self.fuota_deployment + .multicast_session_end + .unwrap_or_else(|| Utc::now()), + ))), } } async fn fragmentation_status(&mut self) -> Result)>> { - // Proceed with next step after reaching the max attempts. - if self.job.attempt_count > self.job.max_retry_count { - return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); - } - - info!("Enqueue FragSessionStatusReq"); - self.job.attempt_count += 1; - let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + let fuota_devices_completed_mc_session_count = fuota_devices + .iter() + .filter(|d| d.mc_session_completed_at.is_some()) + .count(); // Filter on devices that have completed the multicast-session setup but // not yet responded to the FragSessionStatusReq. @@ -711,7 +784,32 @@ impl Flow { .filter(|d| d.mc_session_completed_at.is_some() && d.frag_status_completed_at.is_none()) .collect(); - if fuota_devices.is_empty() { + // Proceed with next step after reaching the max attempts. + if self.job.attempt_count > self.job.max_retry_count { + info!("Set timeout error to devices that did not respond to FragSessionStatusReq"); + fuota::set_device_timeout_error( + self.fuota_deployment.id.into(), + false, + false, + false, + true, + ) + .await?; + + if !fuota_devices.is_empty() { + self.job.warning_msg = format!( + "{} devices did not complete the fragmentation status", + fuota_devices.len() + ); + } + + return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); + } + + info!("Enqueue FragSessionStatusReq"); + self.job.attempt_count += 1; + + if fuota_devices_completed_mc_session_count == 0 { self.job.error_msg = "There are no devices available to complete this step".into(); return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))); } @@ -750,7 +848,12 @@ impl Flow { Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64); Ok(Some((FuotaJob::FragStatus, scheduler_run_after))) } else { - Ok(Some((FuotaJob::DeleteMcGroup, Utc::now()))) + Ok(Some(( + FuotaJob::DeleteMcGroup, + self.fuota_deployment + .multicast_session_end + .unwrap_or_else(|| Utc::now()), + ))) } } @@ -785,21 +888,15 @@ impl Flow { } else { fuota::set_device_completed(self.fuota_deployment.id.into(), true, true, true, true) .await?; - fuota::set_device_timeout_error( - self.fuota_deployment.id.into(), - false, - false, - false, - true, - ) - .await?; } let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?; + let fuota_devices_count = fuota_devices.len(); let fuota_devices: Vec = fuota_devices .into_iter() .filter(|d| d.completed_at.is_some() && d.error_msg.is_empty()) .collect(); + let fuota_devices_completed_count = fuota_devices.len(); for fuota_device in &fuota_devices { let mut d = device::get(&fuota_device.dev_eui).await?; @@ -809,9 +906,15 @@ impl Flow { let _ = device::update(d).await?; } - let mut d = self.fuota_deployment.clone(); - d.completed_at = Some(Utc::now()); - let _ = fuota::update_deployment(d).await?; + if fuota_devices_count != fuota_devices_completed_count { + self.job.warning_msg = format!( + "{} devices did not complete the FUOTA deployment", + fuota_devices_count - fuota_devices_completed_count + ); + } + + self.fuota_deployment.completed_at = Some(Utc::now()); + self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?; Ok(None) } diff --git a/chirpstack/src/storage/fuota.rs b/chirpstack/src/storage/fuota.rs index 13f06657..100e5028 100644 --- a/chirpstack/src/storage/fuota.rs +++ b/chirpstack/src/storage/fuota.rs @@ -36,6 +36,8 @@ pub struct FuotaDeployment { pub multicast_class_b_ping_slot_nb_k: i16, pub multicast_frequency: i64, pub multicast_timeout: i16, + pub multicast_session_start: Option>, + pub multicast_session_end: Option>, pub unicast_max_retry_count: i16, pub fragmentation_fragment_size: i16, pub fragmentation_redundancy_percentage: i16, @@ -69,6 +71,8 @@ impl Default for FuotaDeployment { multicast_class_b_ping_slot_nb_k: 0, multicast_frequency: 0, multicast_timeout: 0, + multicast_session_start: None, + multicast_session_end: None, unicast_max_retry_count: 0, fragmentation_fragment_size: 0, fragmentation_redundancy_percentage: 0, @@ -154,6 +158,7 @@ pub struct FuotaDeploymentJob { pub max_retry_count: i16, pub attempt_count: i16, pub scheduler_run_after: DateTime, + pub warning_msg: String, pub error_msg: String, } @@ -169,6 +174,7 @@ impl Default for FuotaDeploymentJob { max_retry_count: 0, attempt_count: 0, scheduler_run_after: now, + warning_msg: "".into(), error_msg: "".into(), } } @@ -220,6 +226,8 @@ pub async fn update_deployment(d: FuotaDeployment) -> Result Result Int2, multicast_frequency -> Int8, multicast_timeout -> Int2, + multicast_session_start -> Nullable, + multicast_session_end -> Nullable, unicast_max_retry_count -> Int2, fragmentation_fragment_size -> Int2, fragmentation_redundancy_percentage -> Int2, @@ -249,6 +251,7 @@ diesel::table! { max_retry_count -> Int2, attempt_count -> Int2, scheduler_run_after -> Timestamptz, + warning_msg -> Text, error_msg -> Text, } } diff --git a/chirpstack/src/storage/schema_sqlite.rs b/chirpstack/src/storage/schema_sqlite.rs index 1f3ae811..74ef1b72 100644 --- a/chirpstack/src/storage/schema_sqlite.rs +++ b/chirpstack/src/storage/schema_sqlite.rs @@ -180,6 +180,8 @@ diesel::table! { multicast_class_b_ping_slot_nb_k -> SmallInt, multicast_frequency -> BigInt, multicast_timeout -> SmallInt, + multicast_session_start -> Nullable, + multicast_session_end -> Nullable, unicast_max_retry_count -> SmallInt, fragmentation_fragment_size -> SmallInt, fragmentation_redundancy_percentage -> SmallInt, @@ -224,6 +226,7 @@ diesel::table! { max_retry_count -> SmallInt, attempt_count -> SmallInt, scheduler_run_after -> TimestamptzSqlite, + warning_msg -> Text, error_msg -> Text, } } diff --git a/ui/src/views/fuota/FuotaDeploymentDashboard.tsx b/ui/src/views/fuota/FuotaDeploymentDashboard.tsx index 88f473ec..0f6fa3f2 100644 --- a/ui/src/views/fuota/FuotaDeploymentDashboard.tsx +++ b/ui/src/views/fuota/FuotaDeploymentDashboard.tsx @@ -63,6 +63,12 @@ function FuotaDeploymentDashboard(props: IProps) { ); } else if (!record.completedAt) { return } size="small" />; + } else if (record.warningMsg !== "") { + return ( + + warning + + ); } else { return ok; }